KafkaSource
KafkaSource UDL 可访问 Kafka 群集中的数据。所有 Kafka 解析器都必须使用 KafkaSource。KafkaSource 处理的消息长度必须至少为一个字节。对于零长度消息,KafkaSource 会将错误消息写入 vertica.log。
KafkaSource 的输出不能直接使用 Vertica 中的任何非 Kafka 解析器(如 FCSVPARSER)。KafkaParser 会生成其他有关流的元数据,解析器需要使用这些元数据才能正确解析数据。您必须使用 KafkaInsertDelimiters 等筛选器将数据转换为其他解析器可以处理的格式。有关更多示例,请参阅解析自定义格式。
您可以使用 CLOSE_ALL_SESSIONS 等关闭会话函数来取消正在运行的 KafkaSource 数据加载。
语法
KafkaSource(stream='topic_name|partition|start_offset[|end_offset]'[, param=value [,...] ] )
stream
- 必需。将要加载的数据定义为一个或多个分区的逗号分隔列表。每个分区由三个必需值和一个可选值构成,用管道字符 (|) 进行分隔:
-
topic_name:要从其加载数据的 Kafka 主题的名称。您可以从同一流参数中的不同 Kafka 主题进行读取,但存在一些限制。有关详细信息,请参阅从同一流参数中的多个主题进行加载。
-
partition:要复制的 Kafka 主题中的分区。
-
start_offset:Kafka 主题中加载开始处的偏移量。此偏移量包含在内(偏移量为 start_offset 的消息也会加载)。有关其他选项,请参阅下面的特殊起始偏移量值。
-
end_offset:加载应结束处的可选偏移量。此偏移量不包含在内(将不加载偏移量为
end_offset
的消息)。
要使用 end_offset 结束加载,必须为 stream 参数中的所有分区提供结束偏移量值。如果尝试为某些分区设置结束偏移量而不为其他分区设置偏移量值,则会导致出错。
如果不指定结束偏移量,则必须使用stop_on_eof
或duration
提供至少一个其他结束条件。
-
brokers
- Kafka 群集中代理的 host:port 对的逗号分隔列表。Vertica 建议在与 Vertica 不同的计算机上运行 Kafka。
默认值: localhost:9092
duration
- 指定时间范围持续时间的间隔。在此指定时间过后,KafkaSource 会终止 COPY 语句。如果未设置此参数,则必须使用 stop_on_eof 来设置至少一个其他结束条件,或者改为指定结束偏移量。有关详细信息,请参阅下面的持续时间说明。
executionparallelism
- 加载数据时要使用的线程数。通常,将此参数设置为介于 1 和节点要从其加载的分区数之间的整数值。将此参数设置为更小的值可限制用于处理任何 COPY 语句的线程数。此外,还会增加池中发出的短查询的吞吐量,特别是在并发执行多个查询时更是如此。
如果未指定此参数,Vertica 会自动为每个分区创建一个线程,直至到达资源池允许的限值。
如果为 KafkaSource 指定的值小于为调度程序资源池指定的值,则使用 KafkaSource 值。此值不能超过为调度程序的资源池指定的值。
stop_on_eof
- 确定 KafkaSource 在 COPY 语句到达文件结尾后是否终止该语句。如果未设置此值,则必须使用
duration
或通过提供结束偏移量来设置至少一个其他结束条件。默认值: FALSE
group_id
Kafka 使用者组(Vertica 将向其报告消费消息进度)的名称。设置此值以禁用向 Kafka 使用者组报告进度。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况。
默认值:
vertica_database-name
kafka_conf
采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项。
fail_on_conf_parse_error
确定
kafka_conf
包含格式不正确的选项和值或者无效的配置属性时,该函数是否失败。默认值:FALSE
有关接受的选项和值格式,请参阅直接设置 Kafka 库选项。
有关有效配置属性的列表,请参阅 rdkafka GitHub 存储库。
stream
参数的 start_offset 部分允许您从主题分区中的特定点开始加载消息。它还接受两个特殊偏移值之一:
-
-2 告知Set Snippet Variable Value in Topic从主题分区中最早可用的消息开始加载。当您需要从 Kafka 主题的分区中尽量加载更多消息时,此值十分有用。
-
-3 告知Set Snippet Variable Value in Topic从使用者组保存的偏移量开始加载。如果使用者组没有保存的偏移量,它会从主题分区中最早可用的消息开始加载。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况。
从同一流参数中的多个主题进行加载
只要遵循以下准则,便可从同一流参数中的多个 Kafka 主题进行加载:
-
主题的数据必须采用相同的格式,因为要将数据从 KafkaSource 传递到同一解析器。例如,不能从一个采用 Avro 格式的主题和另一个采用 JSON 格式的主题加载数据。
-
同样,如果要加载 Avro 数据并从注册表中指定外部架构,则需要小心谨慎。Avro 解析器在每次加载数据时接受一个架构。如果不同主题中的数据具有不同的架构,则其中一个主题中的所有数据都将被拒绝。
-
不同主题中的数据应具有相同(或非常类似)的架构,尤其是在将数据加载到传统 Vertica 表中时更是如此。虽然可以将具有不同架构的数据加载到 Flex 表中,但只有在少数情况下,将不同的数据合并到一个表中才有意义。
持续时间说明
duration
参数适用于 Vertica 允许 KafkaSource 函数运行的时间长度。它通常反映整个加载语句所花费的时间。但是,如果 KafkaSource 要加载大量数据或数据需要大量处理和解析,则查询的整体运行时间可能会超过在 duration
中指定的时间量。
示例
以下示例演示了如何调用 KafkaSource 以使用以下选项将数据从 Kafka 加载到名为 web_table 的现有 Flex 表中:
-
名为 web_hits 的流只有一个分区。
-
加载将从流中的最早消息开始(通过传递 -2 作为起始偏移量来标识)。
-
加载在到达偏移量为 1000 的消息时结束。
-
Kafka 群集的代理是 example.com 域中的 kafka01 和 kafka03。
-
这些代理正在侦听端口 9092。
-
如果在到达偏移量为 1000 的消息之前到达流的结尾,加载将结束。如果不提供此选项,连接器将等到 Kafka 发送偏移量为 1000 的消息为止。
-
已加载的数据将发送到 KafkaJSONParser 进行处理。
=> COPY web_table
SOURCE KafkaSource(stream='web_hits|0|-2|1000',
brokers='kafka01.example.com:9092,kafka03.example.com:9092',
stop_on_eof=true)
PARSER KafkaJSONParser();
Rows Loaded
-------------
1000
(1 row)
要查看有关此加载操作的详细信息,请查询 KafkaOffsets。KafkaOffsets 会返回 Vertica 在最近一次调用 KafkaSource 期间从 Kafka 使用的消息的相关元数据:
=> SELECT KafkaOffsets() OVER();
ktopic | kpartition | start_offset | end_offset | msg_count | bytes_read | duration | ending | end_msg
----------+------------+--------------+------------+-----------+------------+-----------------+------------+-------------------
web_hits | 0 | 0 | 999 | 1000 | 197027 | 00:00:00.385365 | END_OFFSET | Last message read
(1 row)
msg_count
列用于验证 Vertica 是否已加载 1000 条消息,ending
列指示 Vertica 在到达偏移量为 1000 的消息时已停止使用消息。