直接设置 Kafka 库选项

Vertica 依赖于开源的 rdkafka 库与 Apache Kafka 通信。此库包含用于控制 Vertica 和 Kafka 如何交互的许多选项。可以通过 vkconfig 实用程序和 Kafka 集成函数(如 KafkaSource)中的设置来设置最常见的 rdkafka 库选项。

有些 rdkafka 设置不能直接从 Vertica 中进行设置。一般情况下,您不需要更改它们。但是,如果您发现需要设置一个不能直接从 Vertica 进行的特定 rdkafka 设置,可以将相关选项通过 kafka_conf 选项直接传递到 rdkafka 库。

当使用调度程序从 Kafka 加载数据时,支持 kafka_conf 实参。可以通过以下方式设置这些值(按照优先级从低到高的顺序列出):

  • 在运行 vkconfig 实用程序的主机上设置的 Linux 环境变量 VERTICA_RDKAFKA_CONF

  • 在运行 vkconfig 实用程序的主机上设置的 Linux 环境变量 VERTICA_RDKAFKA_CONF_KAFKA_CLUSTER。该变量名称的 KAFKA_CLUSTER 部分是您已经使用 vkconfig 的群集实用程序定义的 Kafka 群集的名称。该环境变量中的设置仅影响在 KAFKA_CLUSTER 中指定的特定 Kafka 群集。

  • vkconfig 实用程序的 --kafka_conf 选项。可以在 clustersourcelaunch 和同步工具中设置此选项。请注意,该设置仅适用于每个 vkconfig 实用程序调用,而不会延续到其他 vkconfig 实用程序调用。例如,如果您需要为 cluster 和 source 工具提供一个选项,那么必须同时为它们提供 kafka_conf 选项。

所有这些选项都是级联的,因此,如果将使用 --kafka_conf 实参的选项设置为群集工具,在环境变量中设置的相同选项会被覆盖。

当直接调用 KafkaExport、KafkaSource 和多个其他 Kafka 集成函数时,也可以直接设置 rdkafka 选项。这些函数接受名为 kafka_conf 的参数。

kafka_conf 选项设置

kafka_conf vkconfig 选项接受一个 JSON 对象,设置如下格式:

  • 一个或多个选项/值对:

    --kafka_conf '{"option1":value1[, "option2":value2...]}'
    
  • 带有多个值的单个选项:

    --kafka_conf '{"option1":"value1[;value2...]"}'
    

有关 rdkafka 库支持的配置选项列表,请参阅 GitHub 上的 rdkafka 项目

示例

以下示例演示了如何在使用 KafkaSource 手动加载消息时禁用 rdkafka api.version.request 的选项。当访问运行 0.9 或更早版本的 Kafka 群集时,应该禁用此选项。有关详细信息,请参阅为 Apache Kafka 0.9 及更早版本配置 Vertica

=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
                                           brokers='kafka-01.example.com:9092',
                                           stop_on_eof=True,
                                           kafka_conf='{"api.version.request":false}')
        PARSER KafkaJSONParser();
 Rows Loaded
-------------
        5000
(1 row)

以下示例演示了如何在调用群集工具时使用 JSON 对象设置两个选项。它禁用 api.version.request 选项,并使用 check.crcs 选项启用来自 Kafka 的消息的 CRC 检查:

$ vkconfig cluster --create --cluster StreamCluster1 \
                   --hosts kafka01.example.com:9092,kafka02.example.com:9092 \
                   --conf myscheduler.config \
                   --kafka_conf '{"api.version.request":false, "check.crcs":true}'

以下示例演示了如何使用环境变量设置相同的选项:

$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false, "check.crcs":true}'
$ vkconfig cluster --create --cluster StreamCluster1 \
                   --hosts kafka01.example.com:9092,kafka02.example.com:9092 \
                   --conf myscheduler.config