为 Apache Kafka 0.9 及更早版本配置 Vertica

Apache Kafka 0.10 版本引入了一个新功能,它让使用者能够决定 Kafka 代理支持哪个版本的 Kafka API。支持该功能的使用者会向 Kafka 代理发送一个初始 API 版本查询,以确定与它们通信时使用的 API 版本。运行 0.9.0 或更早版本的 Kafka 代理无法响应此 API 查询。如果使用者在超时时间(默认设置为 10 秒)内没有收到来自 Kafka 代理的回复,则使用者可以假定 Kafka 代理运行的是 Kafka 0.9.0 或更早版本。

Vertica 与 Kafka 的集成从 9.1.1 版本开始支持此 API 查询功能。如果将 Vertica 连接到运行 0.9.0 或更早版本的 Kafka 群集,此 API 检查可能会导致问题。您可能会注意到从 Kafka 加载消息的性能较差,并且可能会遇到错误,因为 10 秒的 API 请求超时会导致部分 Kafka 集成功能超时并报告错误。

例如,如果在 Kafka 0.9 群集上通过运行 vkconfig 源实用程序来配置源,则可能会出现以下错误:

$ vkconfig source --create --conf weblog.conf --cluster kafka_weblog --source web_hits
Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException:
ERROR: [[Vertica][VJDBC](5861) ERROR: Error calling processPartition() in
User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]]
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:184)
at com.vertica.solutions.kafka.model.StreamSource.setFromMapAndValidate(StreamSource.java:130)
at com.vertica.solutions.kafka.model.StreamModel.<init>(StreamModel.java:89)
at com.vertica.solutions.kafka.model.StreamSource.<init>(StreamSource.java:39)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:53)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:15)
at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:56)
at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:132)
at com.vertica.solutions.kafka.cli.SourceCLI.main(SourceCLI.java:25)
Caused by: java.sql.SQLNonTransientException: [Vertica][VJDBC](5861)
ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
                           . . .

使用流作业调度程序时禁用 API 版本请求

为了避免出现这些问题,必须禁用 Vertica 用于与 Kafka 通信的 rdkafka 库中的 API 版本请求功能。要想在使用调度程序时禁用此设置,最简单的方法是在运行 vkconfig 脚本的主机上设置一个 Linux 环境变量。使用该环境变量可以确保对 vkconfig 的每次调用都包含禁用 API 版本请求的设置。vkconfig 脚本检查以下两个变量:

  • VERTICA_RDKAFKA_CONF 适用于所有与主机上运行的调度程序通信的 Kafka 群集。

  • VERTICA_RDKAFKA_CONF_CLUSTER_NAME 只适用于名为 CLUSTER_NAME 的 Kafka 群集。如果您的调度程序与多个 Kafka 群集通信,其中一些没有运行 0.9 或更早版本,那么请使用此变量。

如果同时设置这两个变量,则特定于群集的变量优先。如果调度程序所连接到的大多数群集运行的是 Kafka 0.9 或更早版本,但少数运行的是 0.10 或更高版本,那么此功能非常有用。在这种情况下,可以使用 VERTICA_RDKAFKA_CONF 变量禁用大多数群集的 API 版本检查,并使用 VERTICA_RDKAFKA_CONF_CLUSTER_NAME 为特定群集重新启用检查。

如果只有少数 Kafka 群集运行 0.9 或更早版本,可以为它们设置特定于群集的变量,并为大多数群集保留默认值。

环境变量的内容是一个 JSON 对象,它通知 rdkafka 库是否查询 Kafka 群集,看群集中是否存在该对象支持的 API 版本:

'{"api.version.request":false}'

要在 BASH 中设置环境变量,请使用 export 命令:

$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false}'

如果想让该设置仅影响名为 kafka_weblog 的群集,则使用以下命令:

$ export VERTICA_RDKAFKA_CONF_kafka_weblog='{"api.version.request":false}'

可以将该命令添加到任何普通用户环境配置文件(如 ~/.bash_profile)或系统范围的文件(如 /etc/profile)中。必须确保对所有可能运行 vkconfig 脚本的用户(包括由 init 等守护程序进程发出的任何调用)使用相同的设置。还可以在用于设置、配置或启动调度程序的任何脚本中直接包含此命令。

在直接加载消息时禁用 API 版本请求

使用 kafka_conf 参数直接调用 KafkaSource 加载消息时,可以禁用 API 请求:

=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
                                           brokers='kafka-01.example.com:9092',
                                           stop_on_eof=True,
                                           kafka_conf='{"api.version.request":false}')
        PARSER KafkaJSONParser();
 Rows Loaded
-------------
        5000
(1 row)

另请参阅