KafkaSource

KafkaSource UDL 可访问 Kafka 群集中的数据。所有 Kafka 解析器都必须使用 KafkaSource。KafkaSource 处理的消息长度必须至少为一个字节。对于零长度消息,KafkaSource 会将错误消息写入 vertica.log。

KafkaSource 的输出不能直接使用 Vertica 中的任何非 Kafka 解析器(如 FCSVPARSER)。KafkaParser 会生成其他有关流的元数据,解析器需要使用这些元数据才能正确解析数据。您必须使用 KafkaInsertDelimiters 等筛选器将数据转换为其他解析器可以处理的格式。有关更多示例,请参阅解析自定义格式

您可以使用 CLOSE_ALL_SESSIONS 等关闭会话函数来取消正在运行的 KafkaSource 数据加载。

语法

KafkaSource(stream='topic_name|partition|start_offset[|end_offset]'[, param=value [,...] ] )
stream
必需。将要加载的数据定义为一个或多个分区的逗号分隔列表。每个分区由三个必需值和一个可选值构成,用管道字符 (|) 进行分隔:
  • topic_name:要从其加载数据的 Kafka 主题的名称。您可以从同一流参数中的不同 Kafka 主题进行读取,但存在一些限制。有关详细信息,请参阅从同一流参数中的多个主题进行加载

  • partition:要复制的 Kafka 主题中的分区。

  • start_offset:Kafka 主题中加载开始处的偏移量。此偏移量包含在内(偏移量为 start_offset 的消息也会加载)。有关其他选项,请参阅下面的特殊起始偏移量值

  • end_offset:加载应结束处的可选偏移量。此偏移量不包含在内(将不加载偏移量为 end_offset 的消息)。
    要使用 end_offset 结束加载,必须为 stream 参数中的所有分区提供结束偏移量值。如果尝试为某些分区设置结束偏移量而不为其他分区设置偏移量值,则会导致出错。
    如果不指定结束偏移量,则必须使用 stop_on_eofduration 提供至少一个其他结束条件。

brokers
Kafka 群集中代理的 host:port 对的逗号分隔列表。Vertica 建议在与 Vertica 不同的计算机上运行 Kafka。

默认值: localhost:9092

duration
指定时间范围持续时间的间隔。在此指定时间过后,KafkaSource 会终止 COPY 语句。如果未设置此参数,则必须使用 stop_on_eof 来设置至少一个其他结束条件,或者改为指定结束偏移量。有关详细信息,请参阅下面的持续时间说明
executionparallelism
加载数据时要使用的线程数。通常,将此参数设置为介于 1 和节点要从其加载的分区数之间的整数值。将此参数设置为更小的值可限制用于处理任何 COPY 语句的线程数。此外,还会增加池中发出的短查询的吞吐量,特别是在并发执行多个查询时更是如此。

如果未指定此参数,Vertica 会自动为每个分区创建一个线程,直至到达资源池允许的限值。

如果为 KafkaSource 指定的值小于为调度程序资源池指定的值,则使用 KafkaSource 值。此值不能超过为调度程序的资源池指定的值。

stop_on_eof
确定 KafkaSource 在 COPY 语句到达文件结尾后是否终止该语句。如果未设置此值,则必须使用 duration 或通过提供结束偏移量来设置至少一个其他结束条件。

默认值: FALSE

group_id

Kafka 使用者组(Vertica 将向其报告消费消息进度)的名称。设置此值以禁用向 Kafka 使用者组报告进度。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况

默认值: vertica_database-name

kafka_conf

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

fail_on_conf_parse_error

确定 kafka_conf 包含格式不正确的选项和值或者无效的配置属性时,该函数是否失败。

默认值:FALSE

有关接受的选项和值格式,请参阅直接设置 Kafka 库选项

有关有效配置属性的列表,请参阅 rdkafka GitHub 存储库

stream 参数的 start_offset 部分允许您从主题分区中的特定点开始加载消息。它还接受两个特殊偏移值之一:

  • -2 告知Set Snippet Variable Value in Topic从主题分区中最早可用的消息开始加载。当您需要从 Kafka 主题的分区中尽量加载更多消息时,此值十分有用。

  • -3 告知Set Snippet Variable Value in Topic从使用者组保存的偏移量开始加载。如果使用者组没有保存的偏移量,它会从主题分区中最早可用的消息开始加载。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况

从同一流参数中的多个主题进行加载

只要遵循以下准则,便可从同一流参数中的多个 Kafka 主题进行加载:

  • 主题的数据必须采用相同的格式,因为要将数据从 KafkaSource 传递到同一解析器。例如,不能从一个采用 Avro 格式的主题和另一个采用 JSON 格式的主题加载数据。

  • 同样,如果要加载 Avro 数据并从注册表中指定外部架构,则需要小心谨慎。Avro 解析器在每次加载数据时接受一个架构。如果不同主题中的数据具有不同的架构,则其中一个主题中的所有数据都将被拒绝。

  • 不同主题中的数据应具有相同(或非常类似)的架构,尤其是在将数据加载到传统 Vertica 表中时更是如此。虽然可以将具有不同架构的数据加载到 Flex 表中,但只有在少数情况下,将不同的数据合并到一个表中才有意义。

持续时间说明

duration 参数适用于 Vertica 允许 KafkaSource 函数运行的时间长度。它通常反映整个加载语句所花费的时间。但是,如果 KafkaSource 要加载大量数据或数据需要大量处理和解析,则查询的整体运行时间可能会超过在 duration 中指定的时间量。

示例

以下示例演示了如何调用 KafkaSource 以使用以下选项将数据从 Kafka 加载到名为 web_table 的现有 Flex 表中:

  • 名为 web_hits 的流只有一个分区。

  • 加载将从流中的最早消息开始(通过传递 -2 作为起始偏移量来标识)。

  • 加载在到达偏移量为 1000 的消息时结束。

  • Kafka 群集的代理是 example.com 域中的 kafka01 和 kafka03。

  • 这些代理正在侦听端口 9092。

  • 如果在到达偏移量为 1000 的消息之前到达流的结尾,加载将结束。如果不提供此选项,连接器将等到 Kafka 发送偏移量为 1000 的消息为止。

  • 已加载的数据将发送到 KafkaJSONParser 进行处理。

=> COPY web_table
   SOURCE KafkaSource(stream='web_hits|0|-2|1000',
                      brokers='kafka01.example.com:9092,kafka03.example.com:9092',
                      stop_on_eof=true)
   PARSER KafkaJSONParser();
 Rows Loaded
-------------
        1000
(1 row)

要查看有关此加载操作的详细信息,请查询 KafkaOffsets。KafkaOffsets 会返回 Vertica 在最近一次调用 KafkaSource 期间从 Kafka 使用的消息的相关元数据:

=> SELECT KafkaOffsets() OVER();
  ktopic  | kpartition | start_offset | end_offset | msg_count | bytes_read |    duration     |   ending   |      end_msg
----------+------------+--------------+------------+-----------+------------+-----------------+------------+-------------------
 web_hits |          0 |            0 |        999 |      1000 |     197027 | 00:00:00.385365 | END_OFFSET | Last message read
(1 row)

msg_count 列用于验证 Vertica 是否已加载 1000 条消息,ending 列指示 Vertica 在到达偏移量为 1000 的消息时已停止使用消息。