这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

配置 Vertica 和 Kafka

Vertica 和 Kafka 都有可用来优化流式传输数据加载的设置。此部分中的主题将介绍这些设置。

1 - Kafka 和 Vertica 配置设置

可以微调 Vertica 和 Kafka 的配置设置,从而优化性能。

Vertica 和 Kafka 集成使用的是 rdkafka 版本 0.11.6。除非另有说明,否则以下配置设置使用默认的 librdkafka 配置属性值。有关如何覆盖默认值的说明,请参阅直接设置 Kafka 库选项

Vertica 生产者设置

以下设置更改 Vertica 如何使用 KafkaExport 函数和通知程序为 Kafka 生成数据。

queue.buffering.max.messages
Vertica 生产者队列的大小。如果 Vertica 以过快的速度生成过多消息,那么队列可能会填满,从而导致丢弃消息。增加此值会消耗更多内存,但是会减少消息丢失的机会。

默认值

  • KafkaExport:1000

  • 通知程序:10000

queue.buffering.max.ms
Vertica 刷新生产者消息队列的频率。虽然较低的值会减少延迟,但以吞吐量为代价。较高的值会增加吞吐量,但会导致生产者队列(由 queue.buffer.max.messages 设置)更频繁地填满,从而导致丢弃消息。

默认值: 100 ms

message.max.bytes
Kafka 协议请求消息批次的最大大小。此值在源、代理和生产者上都应该相同。
message.send.max.retries
生产者尝试将消息传递给代理的次数。较高的值会增加成功机会。
retry.backoff.ms
Vertica 在重新发送失败的消息前等待的间隔。
request.required.acks
Kafka 在认为消息传递成功之前需要的代理副本确认数。需要确认会增加延迟。移除确认会增加消息丢失的风险。
request.timeout.ms
生产者等待代理响应的间隔。代理响应时间受服务器负载和所需消息确认数的影响。
较高的值会增加延迟。
compression.type
在将数据发送给代理之前用于编码数据的压缩算法。压缩有助于减少 Vertica 生产者的网络占用空间并提高磁盘利用率。Vertica 支持 gzipsnappy

Kafka 代理设置

Kafka 代理接收来自生产者的消息,并在 Kafka 使用者之间分配它们。在代理本身上配置这些设置。这些设置独立于生产者和使用者设置。有关 Apache Kafka 代理设置的详细信息,请参考 Apache Kafka 文档

message.max.bytes
Kafka 协议请求消息批次的最大大小。此值在源、代理和生产者上应该相同。
num.io.threads
代理用于接收和处理请求的网络线程数。更多的线程可以提高并发性。
num.network.threads
代理用于接受网络请求的网络线程数。更多的线程可以提高并发性。

Vertica 使用者设置

以下设置更改 Vertica 在使用来自 Kafka 的数据时的行为方式。当直接执行 COPY 语句时,可以在 KafkaSource UDL 上使用 kafka_conf 参数设置此值。对于调度程序,请使用调度程序工具中的 --message_max_bytes 设置。

message.max.bytes
Kafka 协议请求消息批次的最大大小。请将该值设置为足够高的值,以防止提取消息批次的开销干扰加载数据。对于新创建的加载规范,默认为 24MB。

2 - 直接设置 Kafka 库选项

Vertica 依赖于开源的 rdkafka 库与 Apache Kafka 通信。此库包含用于控制 Vertica 和 Kafka 如何交互的许多选项。可以通过 vkconfig 实用程序和 Kafka 集成函数(如 KafkaSource)中的设置来设置最常见的 rdkafka 库选项。

有些 rdkafka 设置不能直接从 Vertica 中进行设置。一般情况下,您不需要更改它们。但是,如果您发现需要设置一个不能直接从 Vertica 进行的特定 rdkafka 设置,可以将相关选项通过 kafka_conf 选项直接传递到 rdkafka 库。

当使用调度程序从 Kafka 加载数据时,支持 kafka_conf 实参。可以通过以下方式设置这些值(按照优先级从低到高的顺序列出):

  • 在运行 vkconfig 实用程序的主机上设置的 Linux 环境变量 VERTICA_RDKAFKA_CONF

  • 在运行 vkconfig 实用程序的主机上设置的 Linux 环境变量 VERTICA_RDKAFKA_CONF_KAFKA_CLUSTER。该变量名称的 KAFKA_CLUSTER 部分是您已经使用 vkconfig 的群集实用程序定义的 Kafka 群集的名称。该环境变量中的设置仅影响在 KAFKA_CLUSTER 中指定的特定 Kafka 群集。

  • vkconfig 实用程序的 --kafka_conf 选项。可以在 clustersourcelaunch 和同步工具中设置此选项。请注意,该设置仅适用于每个 vkconfig 实用程序调用,而不会延续到其他 vkconfig 实用程序调用。例如,如果您需要为 cluster 和 source 工具提供一个选项,那么必须同时为它们提供 kafka_conf 选项。

所有这些选项都是级联的,因此,如果将使用 --kafka_conf 实参的选项设置为群集工具,在环境变量中设置的相同选项会被覆盖。

当直接调用 KafkaExport、KafkaSource 和多个其他 Kafka 集成函数时,也可以直接设置 rdkafka 选项。这些函数接受名为 kafka_conf 的参数。

kafka_conf 选项设置

kafka_conf vkconfig 选项接受一个 JSON 对象,设置如下格式:

  • 一个或多个选项/值对:

    --kafka_conf '{"option1":value1[, "option2":value2...]}'
    
  • 带有多个值的单个选项:

    --kafka_conf '{"option1":"value1[;value2...]"}'
    

有关 rdkafka 库支持的配置选项列表,请参阅 GitHub 上的 rdkafka 项目

示例

以下示例演示了如何在使用 KafkaSource 手动加载消息时禁用 rdkafka api.version.request 的选项。当访问运行 0.9 或更早版本的 Kafka 群集时,应该禁用此选项。有关详细信息,请参阅为 Apache Kafka 0.9 及更早版本配置 Vertica

=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
                                           brokers='kafka-01.example.com:9092',
                                           stop_on_eof=True,
                                           kafka_conf='{"api.version.request":false}')
        PARSER KafkaJSONParser();
 Rows Loaded
-------------
        5000
(1 row)

以下示例演示了如何在调用群集工具时使用 JSON 对象设置两个选项。它禁用 api.version.request 选项,并使用 check.crcs 选项启用来自 Kafka 的消息的 CRC 检查:

$ vkconfig cluster --create --cluster StreamCluster1 \
                   --hosts kafka01.example.com:9092,kafka02.example.com:9092 \
                   --conf myscheduler.config \
                   --kafka_conf '{"api.version.request":false, "check.crcs":true}'

以下示例演示了如何使用环境变量设置相同的选项:

$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false, "check.crcs":true}'
$ vkconfig cluster --create --cluster StreamCluster1 \
                   --hosts kafka01.example.com:9092,kafka02.example.com:9092 \
                   --conf myscheduler.config

3 - 为 Apache Kafka 0.9 及更早版本配置 Vertica

Apache Kafka 0.10 版本引入了一个新功能,它让使用者能够决定 Kafka 代理支持哪个版本的 Kafka API。支持该功能的使用者会向 Kafka 代理发送一个初始 API 版本查询,以确定与它们通信时使用的 API 版本。运行 0.9.0 或更早版本的 Kafka 代理无法响应此 API 查询。如果使用者在超时时间(默认设置为 10 秒)内没有收到来自 Kafka 代理的回复,则使用者可以假定 Kafka 代理运行的是 Kafka 0.9.0 或更早版本。

Vertica 与 Kafka 的集成从 9.1.1 版本开始支持此 API 查询功能。如果将 Vertica 连接到运行 0.9.0 或更早版本的 Kafka 群集,此 API 检查可能会导致问题。您可能会注意到从 Kafka 加载消息的性能较差,并且可能会遇到错误,因为 10 秒的 API 请求超时会导致部分 Kafka 集成功能超时并报告错误。

例如,如果在 Kafka 0.9 群集上通过运行 vkconfig 源实用程序来配置源,则可能会出现以下错误:

$ vkconfig source --create --conf weblog.conf --cluster kafka_weblog --source web_hits
Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException:
ERROR: [[Vertica][VJDBC](5861) ERROR: Error calling processPartition() in
User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]]
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:184)
at com.vertica.solutions.kafka.model.StreamSource.setFromMapAndValidate(StreamSource.java:130)
at com.vertica.solutions.kafka.model.StreamModel.<init>(StreamModel.java:89)
at com.vertica.solutions.kafka.model.StreamSource.<init>(StreamSource.java:39)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:53)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:15)
at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:56)
at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:132)
at com.vertica.solutions.kafka.cli.SourceCLI.main(SourceCLI.java:25)
Caused by: java.sql.SQLNonTransientException: [Vertica][VJDBC](5861)
ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
                           . . .

使用流作业调度程序时禁用 API 版本请求

为了避免出现这些问题,必须禁用 Vertica 用于与 Kafka 通信的 rdkafka 库中的 API 版本请求功能。要想在使用调度程序时禁用此设置,最简单的方法是在运行 vkconfig 脚本的主机上设置一个 Linux 环境变量。使用该环境变量可以确保对 vkconfig 的每次调用都包含禁用 API 版本请求的设置。vkconfig 脚本检查以下两个变量:

  • VERTICA_RDKAFKA_CONF 适用于所有与主机上运行的调度程序通信的 Kafka 群集。

  • VERTICA_RDKAFKA_CONF_CLUSTER_NAME 只适用于名为 CLUSTER_NAME 的 Kafka 群集。如果您的调度程序与多个 Kafka 群集通信,其中一些没有运行 0.9 或更早版本,那么请使用此变量。

如果同时设置这两个变量,则特定于群集的变量优先。如果调度程序所连接到的大多数群集运行的是 Kafka 0.9 或更早版本,但少数运行的是 0.10 或更高版本,那么此功能非常有用。在这种情况下,可以使用 VERTICA_RDKAFKA_CONF 变量禁用大多数群集的 API 版本检查,并使用 VERTICA_RDKAFKA_CONF_CLUSTER_NAME 为特定群集重新启用检查。

如果只有少数 Kafka 群集运行 0.9 或更早版本,可以为它们设置特定于群集的变量,并为大多数群集保留默认值。

环境变量的内容是一个 JSON 对象,它通知 rdkafka 库是否查询 Kafka 群集,看群集中是否存在该对象支持的 API 版本:

'{"api.version.request":false}'

要在 BASH 中设置环境变量,请使用 export 命令:

$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false}'

如果想让该设置仅影响名为 kafka_weblog 的群集,则使用以下命令:

$ export VERTICA_RDKAFKA_CONF_kafka_weblog='{"api.version.request":false}'

可以将该命令添加到任何普通用户环境配置文件(如 ~/.bash_profile)或系统范围的文件(如 /etc/profile)中。必须确保对所有可能运行 vkconfig 脚本的用户(包括由 init 等守护程序进程发出的任何调用)使用相同的设置。还可以在用于设置、配置或启动调度程序的任何脚本中直接包含此命令。

在直接加载消息时禁用 API 版本请求

使用 kafka_conf 参数直接调用 KafkaSource 加载消息时,可以禁用 API 请求:

=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
                                           brokers='kafka-01.example.com:9092',
                                           stop_on_eof=True,
                                           kafka_conf='{"api.version.request":false}')
        PARSER KafkaJSONParser();
 Rows Loaded
-------------
        5000
(1 row)

另请参阅

4 - 在 Kafka 0.11 及更高版本中对 message.max.bytes 设置的更改

在 Kafka 0.10 及更早版本中,message.max.bytes 设置用来配置单个消息允许的最大大小。从 0.11 版开始,Kafka 开始按批次对消息进行分组。该版本将此设置的含义更改为一个消息批次允许的最大大小。

这一更改会对 Vertica 数据库加载 Kafka 0.11 及更高版本中消息的速度产生明显的影响。如果使用 Vertica 9.1.0 或更早版本创建流作业调度程序,您可能会发现其加载规范的 message.max.bytes 值设置得太低。升级调度程序不会自动调整此设置。

将 Vertica 与 Kafka 0.11 及更高版本结合使用时,为了防止吞吐量变慢,请手动调整调度程序加载规范中的 message.max.bytes 设置。Vertica 的测试表明,该设置的最佳值是 25165824 (24 × 1024 × 1024) 字节。

以下示例演示了如何更新现有调度程序的加载规范 weblog_load 中的 message.max.bytes 设置,该调度程序使用名为 weblog.conf 的配置文件进行定义:

$ /opt/vertica/packages/kafka/bin/vkconfig load-spec --update \
     --load-spec weblog_load --conf weblog.conf --message-max-bytes 25165824