直接设置 Kafka 库选项
Vertica 依赖于开源的 rdkafka 库与 Apache Kafka 通信。此库包含用于控制 Vertica 和 Kafka 如何交互的许多选项。可以通过 vkconfig 实用程序和 Kafka 集成函数(如 KafkaSource)中的设置来设置最常见的 rdkafka 库选项。
有些 rdkafka 设置不能直接从 Vertica 中进行设置。一般情况下,您不需要更改它们。但是,如果您发现需要设置一个不能直接从 Vertica 进行的特定 rdkafka 设置,可以将相关选项通过 kafka_conf 选项直接传递到 rdkafka 库。
当使用调度程序从 Kafka 加载数据时,支持 kafka_conf 实参。可以通过以下方式设置这些值(按照优先级从低到高的顺序列出):
-
在运行 vkconfig 实用程序的主机上设置的 Linux 环境变量
VERTICA_RDKAFKA_CONF
。 -
在运行 vkconfig 实用程序的主机上设置的 Linux 环境变量
VERTICA_RDKAFKA_CONF_
KAFKA_CLUSTER。该变量名称的 KAFKA_CLUSTER 部分是您已经使用 vkconfig 的群集实用程序定义的 Kafka 群集的名称。该环境变量中的设置仅影响在 KAFKA_CLUSTER 中指定的特定 Kafka 群集。 -
vkconfig 实用程序的
--kafka_conf
选项。可以在 cluster、source、launch 和同步工具中设置此选项。请注意,该设置仅适用于每个 vkconfig 实用程序调用,而不会延续到其他 vkconfig 实用程序调用。例如,如果您需要为 cluster 和 source 工具提供一个选项,那么必须同时为它们提供 kafka_conf 选项。
注意
使用环境变量设置 rdkafka 选项有助于保持设置的一致性。很容易忘记为 vkconfig 脚本的每次调用设置--kafka_conf
选项。
所有这些选项都是级联的,因此,如果将使用 --kafka_conf
实参的选项设置为群集工具,在环境变量中设置的相同选项会被覆盖。
当直接调用 KafkaExport、KafkaSource 和多个其他 Kafka 集成函数时,也可以直接设置 rdkafka 选项。这些函数接受名为 kafka_conf
的参数。
kafka_conf 选项设置
kafka_conf vkconfig 选项接受一个 JSON 对象,设置如下格式:
-
一个或多个选项/值对:
--kafka_conf '{"option1":value1[, "option2":value2...]}'
-
带有多个值的单个选项:
--kafka_conf '{"option1":"value1[;value2...]"}'
有关 rdkafka 库支持的配置选项列表,请参阅 GitHub 上的 rdkafka 项目。
重要
如果通过 kafka_conf 随意设置选项,可能会导致错误或不可预知的行为。如果在使用 kafka_conf 选项设置 rdkafka 选项后遇到加载消息的问题,请回退更改,查看这是否为问题的来源。为了防止混淆,永远不要通过可以直接通过调度程序选项来设置的 kafka_conf 参数设置选项。例如,不要使用 kafka_conf 选项设置 Kafka 的 message.max.bytes 设置,请改用 load-spec 工具的 --message-max-bytes 选项。
示例
以下示例演示了如何在使用 KafkaSource 手动加载消息时禁用 rdkafka api.version.request
的选项。当访问运行 0.9 或更早版本的 Kafka 群集时,应该禁用此选项。有关详细信息,请参阅为 Apache Kafka 0.9 及更早版本配置 Vertica。
=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
brokers='kafka-01.example.com:9092',
stop_on_eof=True,
kafka_conf='{"api.version.request":false}')
PARSER KafkaJSONParser();
Rows Loaded
-------------
5000
(1 row)
以下示例演示了如何在调用群集工具时使用 JSON 对象设置两个选项。它禁用 api.version.request
选项,并使用 check.crcs
选项启用来自 Kafka 的消息的 CRC 检查:
$ vkconfig cluster --create --cluster StreamCluster1 \
--hosts kafka01.example.com:9092,kafka02.example.com:9092 \
--conf myscheduler.config \
--kafka_conf '{"api.version.request":false, "check.crcs":true}'
以下示例演示了如何使用环境变量设置相同的选项:
$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false, "check.crcs":true}'
$ vkconfig cluster --create --cluster StreamCluster1 \
--hosts kafka01.example.com:9092,kafka02.example.com:9092 \
--conf myscheduler.config
重要
设置check.crc
选项只是一个示例。正常情况下,Vertica 不建议在调度程序中启用 CRC 检查。这会增加额外的开销,并且可能导致性能变慢。