这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

Kafka 函数参考

此部分将列出构成 Vertica 的 Kafka 集成功能的函数。

1 - KafkaAvroParser

KafkaAvroParser 会解析 Avro 格式的 Kafka 消息并将其加载到常规 Vertica 表或 Vertica Flex 表中。

语法

KafkaAvroParser(param=value[,...])
enforce_length
设置为 TRUE 时,如果有值太宽而无法容纳到其所在的列中,则拒绝该行。使用默认设置 (FALSE) 时,解析器会截断任何太宽而无法容纳到列最大宽度内的值。
reject_on_materialized_type_error
设置为 TRUE 时,如果行中含有实体化列值,但该列值无法映射到实体化列的数据类型,则拒绝该行。
flatten_maps
如果设置为 TRUE,则将所有 Avro 映射扁平化。
flatten_arrays
如果设置为 TRUE,则将 Avro 数组扁平化。
flatten_records
如果设置为 TRUE,则将所有 Avro 记录扁平化。
external_schema
将 Avro 文件的架构指定为 JSON 字符串。如果未指定此参数,解析器假定每个消息都具有架构。如果使用的是架构注册表,请勿使用此参数。
codec
指定编写 Avro 文件所用的编解码器。有效值包括:
  • 'default' - 不压缩数据,因此不需要编解码器

  • 'deflate' - 使用 Deflate 编解码器压缩数据

  • 'snappy' - Snappy 压缩

with_metadata
如果设置为 TRUE,消息包括 Avro 基准、架构和对象元数据。默认情况下,KafkaAvroParser 会解析消息,但不包括架构和元数据。如果启用此参数,请使用 Avro API 编写消息,并确认它们仅包含 Avro 基准。默认值为 FALSE。
schema_registry_url
必需。指定 Confluent 架构注册表的 URL。需要使用此参数根据架构注册表版本加载数据。如果使用的是外部架构,请勿使用此参数。有关详细信息,请参考 在 Kafka 中使用架构注册表。
schema_registry_ssl_ca_path
进行 TLS 连接时需要提供。Vertica 节点文件系统上指向已对架构注册表服务器证书签名的证书颁发机构 (CA) 的路径。

每个 Vertica 节点都必须将 CA 存储在相同的文件系统路径中。

schema_registry_ssl_cert_path
Vertica 节点文件系统上指向由架构注册表信任的证书颁发机构 (CA) 所颁发的客户端证书的路径。
schema_registry_ssl_key_path
Vertica 服务器文件系统上指向使用 schema_registry_ssl_cert_path 定义的客户端证书的私钥的路径。
schema_registry_ssl_key_password_path
Vertica 服务器文件系统上指向使用 schema_registry_ssl_key_path 定义的私钥的可选密码的路径。
schema_registry_subject
在架构注册表中,表示用于加载数据的架构的主题。
schema_registry_version
在架构注册表中,表示用于加载数据的架构的版本。
key_separator
设置用作键之间的分隔符的字符。

数据类型

KafkaAvroParser 支持与 favroparser 相同的数据类型。有关详细信息,请参阅Avro 数据

示例

以下示例演示了如何以 Avro 格式从 Kafka 加载数据。该语句:

  • 会将数据加载到名为 weather_logs 的现有 Flex 表中。

  • 会从默认 Kafka 代理(通过端口 9092 在本地系统上运行)复制数据。

  • 源被命名为 temperature。

  • 源具有一个分区。

  • 加载从偏移量 0 处开始。

  • 加载在 10 秒后或在加载到达源结尾后结束,以先发生者为准。

  • KafkaAvroParser 不会将它在源中找到的任何数组、映射或记录扁平化。

  • 数据的架构将在语句中以 JSON 字符串的形式提供。该架构定义了名为 Weather 的记录类型,其中包含表示工作站名称、时间和温度的字段。

  • 被拒绝的数据行将保存到名为 t_rejects1 的表中。

=> COPY weather_logs
   SOURCE KafkaSource(stream='temperature|0|0', stop_on_eof=true,
                      duration=interval '10 seconds')
   PARSER KafkaAvroParser(flatten_arrays=False, flatten_maps=False, flatten_records=False,
                          external_schema=E'{"type":"record","name":"Weather","fields":'
                                           '[{"name":"station","type":"string"},'
                                           '{"name":"time","type":"long"},'
                                           '{"name":"temp","type":"int"}]}')
   REJECTED DATA AS TABLE "t_rejects1";

2 - KafkaCheckBrokers

检索 Kafka 群集中各个代理的相关信息。此函数主要供内部使用 — 流式传输作业调度程序会使用它来获取 Kafka 群集中代理的列表。您可以调用该函数来确定 Vertica 可识别的代理。

语法

KafkaCheckBrokers(USING PARAMETERS brokers='hostname:port[,hostname2:port...]'
                                   [, kafka_conf='kafka_configuration_setting']
                                   [, timeout=timeout_sec])
brokers
Kafka 群集中代理的主机名和端口号,用于检索代理的列表。您可以使用逗号分隔的列表提供多个代理。如果列表包含来自多个 Kafka 群集的代理,则会查询包含列表中最后一个主机的群集。
kafka_conf

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

timeout

等待 Kafka 群集做出响应的整数秒数。

示例

=> SELECT KafkaCheckBrokers(USING PARAMETERS brokers='kafka01.example.com:9092')
          OVER ();
 broker_id |        hostname     | port
-----------+---------------------+------
         2 | kafka03.example.com | 9092
         1 | kafka02.example.com | 9092
         3 | kafka04.example.com | 9092
         0 | kafka01.example.com | 9092
(4 rows)

3 - KafkaExport

将 Vertica 数据发送到 Kafka。

如果 Vertica 成功将所有数据行导出到 Kafka,则此函数将返回零行。您可以使用此函数的输出将失败消息复制到辅助表,以备评估和重新处理。

语法

SELECT KafkaExport(partitionColumn, keyColumn, valueColumn
    USING PARAMETERS brokers='host[:port][,host...]',
    topic='topicname'
    [,kafka_conf='kafka_configuration_setting']
    [,fail_on_conf_parse_error=Boolean])
OVER (partition_clause) FROM table;

参数

partitionColumn
要导出的目标分区。如果将此值设为 NULL,Vertica 将使用默认分区架构。您可以使用 partition 实参将消息发送至映射到 Vertica 分段的分区。
keyColumn
与 valueColumn 关联的用户定义的键值。使用 NULL 跳过此实参。
valueColumn
消息自身。该列是 LONG VARCHAR,允许向 Kafka 最多发送 32MB 数据。但是,Kafka 可能会对消息大小施加自己的限制。
brokers
包含 Kafka 群集中代理的一个或多个主机名或 IP 地址(含可选端口号)的逗号分隔列表的字符串。
topic
要导出到的 Kafka 主题。
kafka_conf

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

fail_on_conf_parse_error

确定 kafka_conf 包含格式不正确的选项和值或者无效的配置属性时,该函数是否失败。

默认值:FALSE

有关接受的选项和值格式,请参阅直接设置 Kafka 库选项

有关有效配置属性的列表,请参阅 rdkafka GitHub 存储库

示例

以下示例会将 iot_report 表中的每一行转换为 JSON 对象,然后将其导出到名为 iot-report 的 Kafka 主题。

iot_report 表包含以下列和数据:

=> SELECT * FROM iot_report;
 server |        date         |        location         |    id
--------+---------------------+-------------------------+----------
      1 | 2016-10-11 04:09:28 |  -14.86058, 112.75848   | 70982027
      1 | 2017-07-02 12:37:48 |  -21.42197, -127.17672  | 49494918
      2 | 2017-01-20 03:05:24 |  37.17372, 136.14026    | 36670100
      2 | 2017-07-29 11:38:37 |  -38.99517, 171.72671   | 52049116
      1 | 2017-10-19 14:04:33 |  -71.72156, -36.27381   | 94328189
(5 rows)

要构建 KafkaExport 函数,请提供以下值以定义 Kafka 消息:

  • partitionColumn:使用 server 列。由于 server 列值从 1 开始,而 Kafka 分区从 0 开始,因此需从 server 值中减去 1。

  • keyColumn:使用 id 列。此列要求您将 id显式转换为 VARCHAR 类型。

  • valueColumn:每条消息都会在已导出的 JSON 对象中将 date 和 location 列格式化为键/值对。

    要将这些行转换为 JSON 格式,请使用 ROW 函数将 date 和 location 列转换为结构化数据。然后,将 ROW 函数传递给 TO_JSON 函数,以将数据结构编码为 JSON 对象。

填写所需的剩余函数实参并执行 KafkaExport 函数。如果成功,该函数将返回 0 行:

=> SELECT KafkaExport(server - 1, id::VARCHAR, TO_JSON(ROW(date, location))
                      USING PARAMETERS brokers='broker01:9092,broker02:9092',
                      topic='iot-results')
   OVER (PARTITION BEST)
   FROM iot_report;
 partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)

使用 kafkacat 验证使用者是否在 iot-results 主题中包含 JSON 格式的数据:

$ /opt/vertica/packages/kafka/bin/kafkacat -C -t iot-results -b broker01:9092,broker02:9092
{"date":"2017-01-20 03:05:24","location":" 37.17372, 136.14026   "}
{"date":"2017-07-29 11:38:37","location":" -38.99517, 171.72671  "}
{"date":"2016-10-11 04:09:28","location":" -14.86058, 112.75848  "}
{"date":"2017-10-19 14:04:33","location":" -71.72156, -36.27381  "}
{"date":"2017-07-02 12:37:48","location":" -21.42197, -127.17672 "}

另请参阅

使用 KafkaExport 生成数据

4 - KafkaJSONParser

KafkaJSONParser 会解析 JSON 格式的 Kafka 消息并将其加载到常规 Vertica 表或 Vertica Flex 表中。

语法

KafkaJSONParser(
        [enforce_length=Boolean]
        [, flatten_maps=Boolean]
        [, flatten_arrays=Boolean]
        [, start_point=string]
        [, start_point_occurrence=integer]
        [, omit_empty_keys=Boolean]
        [, reject_on_duplicate=Boolean]
        [, reject_on_materialized_type_error=Boolean]
        [, reject_on_empty_key=Boolean]
        [, key_separator=char]
        [, suppress_nonalphanumeric_key_chars=Boolean]
        )
enforce_length
设置为 TRUE 时,如果加载的数据太宽而无法容纳到其所在的列中,则拒绝该行。默认设置为 FALSE,以截断任何太宽而无法容纳到其所在列的数据。
flatten_maps
如果设置为 TRUE,则将所有 JSON 映射扁平化。
flatten_arrays
如果设置为 TRUE,则将 JSON 数组扁平化。
start_point
指定解析器应解析的 JSON 数据中的键。解析器仅提取与 start_point 键关联的值内的数据。它会解析数据内 start_point 键的所有实例的值。
start_point_occurrence
整数值,指示由 start_point 参数指定的键的出现位置,解析器应从该位置开始解析。例如,如果将此值设置为 4,则解析器将仅从 start_point 键第五次出现时开始加载数据。仅在同时提供 start_point 参数时才有效。
omit_empty_keys
如果设置为 TRUE,则忽略加载数据中不含值集的所有键。
reject_on_duplicate
如果设置为 TRUE,则拒绝含有重复键名称的行。键名不区分大小写,因此键“mykey”和“MyKey”将被视为重复。
reject_on_materialized_type_error
如果设置为 TRUE,则在数据含有与现有实体化列匹配的键,但其中一个键无法映射到实体化列的数据类型时,拒绝该行。
reject_on_empty_key
如果设置为 TRUE,则拒绝含有键但不含值的所有行。
key_separator
用作键值之间的分隔符的单个字符,而不是默认句点 (.) 字符。
suppress_nonalphanumeric_key_chars
如果设置为 TRUE,则将 JSON 键值中的所有非字母数字字符替换为下划线 (_) 字符。

有关详细信息,请参阅JSON 数据

以下示例演示了如何从 Kafka 加载 JSON 数据。该语句中的参数根据加载情况定义了以下事项:

  • 将数据加载到预先存在且名为 logs 的表中。

  • KafkaSource 会从名为 server_log 的源中的单个分区流式传输数据。

  • 用于加载数据的 Kafka 代理将通过端口 9092 在名为 kafka01 的主机上运行。

  • KafkaSource 在 10 秒后或到达流的结尾时停止加载数据,以先发生者为准。

  • KafkJSONParser 会将 JSON 数据中的任何数组或映射扁平化。

=> COPY logs SOURCE KafkaSource(stream='server_log|0|0',
                                stop_on_eof=true,
                                duration=interval '10 seconds',
                                brokers='kafka01:9092')
   PARSER KafkaJSONParser(flatten_arrays=True, flatten_maps=True);

5 - KafkaListManyTopics

从 Kafka 代理检索有关所有主题的信息。此函数列出 Kafka 群集中定义的所有主题,以及它包含的分区数量和为这些主题提供服务的代理。

语法

KafkaListManyTopics('broker:port[;...]'
                    [USING PARAMETERS
                        [kafka_conf='kafka_configuration_setting'
                        [, timeout=timeout_sec]])
broker
Kafka 群集中代理的主机名(或 IP 地址)
port
运行代理的端口号。
kafka_conf

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

timeout

等待 Kafka 群集做出响应的整数秒数。

示例

=> \x
Expanded display is on.
=> SELECT KafkaListManyTopics('kafka01.example.com:9092')
   OVER (PARTITION AUTO);
-[ RECORD 1 ]--+--------------------------------------------------
brokers        | kafka01.example.com:9092,kafka02.example.com:9092
topic          | __consumer_offsets
num_partitions | 50
-[ RECORD 2 ]--+--------------------------------------------------
brokers        | kafka01.example.com:9092,kafka02.example.com:9092
topic          | iot_data
num_partitions | 1
-[ RECORD 3 ]--+--------------------------------------------------
brokers        | kafka01.example.com:9092,kafka02.example.com:9092
topic          | test
num_partitions | 1

6 - KafkaListTopics

从 Kafka 代理获取可用主题列表。

语法

KafkaListTopics(USING PARAMETERS brokers='hostname:port[,hostname2:port2...]'
                                 [, kafka_conf='kafka_configuration_setting']
                                 [, timeout=timeout_sec])
brokers
要查询主题列表的代理的主机名和端口号。您可以使用逗号分隔的列表提供多个代理。但是,返回的列表将仅包含列表中最后一个代理服务的主题。
kafka_conf

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

timeout

等待 Kafka 群集做出响应的整数秒数。

示例

=> SELECT KafkaListTopics(USING PARAMETERS brokers='kafka1-01.example.com:9092')
          OVER ();
         topic         | num_partitions
-----------------------+----------------
 test                  |              1
 iot_data              |              1
 __consumer_offsets    |             50
 vertica_notifications |              1
 web_hits              |              1
(5 rows)

7 - KafkaOffsets

KafkaOffsets 用户定义的转换函数返回最近通过调用 KafkaSource 生成的加载操作统计信息。查询 KafkaOffsets 可查看最近加载操作生成的元数据。可以在每次调用 KafkaSource 后查询 KafkaOffsets,以查看有关该加载的信息。如果使用的是调度程序,还可以在 stream_microbatch_history 表中查看历史加载信息。

对于每个加载操作,KafkaOffsets 返回以下内容:

  • 源 kafka 主题

  • 源 kafka 分区

  • 起始偏移量

  • 结束偏移量

  • 加载的消息数

  • 读取的字节数

  • 加载操作的持续时间

  • 结束消息

  • 结束原因

以下示例演示了如何调用 KafkaOffsets 以显示使用 KafkaSource 加载的名为 web_test 的表的分区信息。

=> SELECT kpartition, start_offset, end_offset, msg_count, ending FROM (select KafkaOffsets() over()
   FROM web_test) AS stats ORDER BY kpartition;
 kpartition | start_offset | end_offset | msg_count |   ending
------------+--------------+------------+-----------+------------
          0 |           -2 |       9999 |      1068 | END_OFFSET

输出显示,KafkaSource 从 Kafka 加载了单个分区中的 1068 条消息(行)。KafkaSource 因为到达了结束偏移量而结束了数据加载。

8 - KafkaParser

KafkaParser 不解析从 Kafka 加载的数据。相反,它以 LONG VARCHAR 值的形式传递消息。当您希望将原始 Kafka 消息加载到 Vertica 进行进一步处理时,请使用此解析器。您可以将此解析器用作不支持的格式均可使用的通用解析器。

KafkaParser 不接受任何参数。

示例

以下示例将原始消息从名为 iot-data 的 Kafka 主题加载到一个名为 raw_iot 的表中。

=> CREATE TABLE raw_iot(message LONG VARCHAR);
CREATE TABLE
=> COPY raw_iot SOURCE KafkaSource(stream='iot-data|0|-2,iot-data|1|-2,iot-data|2|-2',
                                   brokers='docd01:6667,docd03:6667', stop_on_eof=TRUE)
                PARSER KafkaParser();
 Rows Loaded
-------------
        5000
(1 row)

=> select * from raw_iot limit 10;
              message
------------------------------------
 10039, 59, -68.951406, -19.270126
 10042, 40, -82.688712, 4.7187705
 10054, 6, -153.805268, -10.5173935
 10054, 71, -135.613150, 58.286458
 10081, 44, 130.288419, -77.344405
 10104, -5, 77.882598, -56.600744
 10132, 87, 103.530616, -69.672863
 10135, 6, -121.420382, 15.3229855
 10166, 77, -179.592211, 42.0477075
 10183, 62, 17.225394, -55.6644765
(10 rows)

9 - KafkaSource

KafkaSource UDL 可访问 Kafka 群集中的数据。所有 Kafka 解析器都必须使用 KafkaSource。KafkaSource 处理的消息长度必须至少为一个字节。对于零长度消息,KafkaSource 会将错误消息写入 vertica.log。

KafkaSource 的输出不能直接使用 Vertica 中的任何非 Kafka 解析器(如 FCSVPARSER)。KafkaParser 会生成其他有关流的元数据,解析器需要使用这些元数据才能正确解析数据。您必须使用 KafkaInsertDelimiters 等筛选器将数据转换为其他解析器可以处理的格式。有关更多示例,请参阅解析自定义格式

您可以使用 CLOSE_ALL_SESSIONS 等关闭会话函数来取消正在运行的 KafkaSource 数据加载。

语法

KafkaSource(stream='topic_name|partition|start_offset[|end_offset]'[, param=value [,...] ] )
stream
必需。将要加载的数据定义为一个或多个分区的逗号分隔列表。每个分区由三个必需值和一个可选值构成,用管道字符 (|) 进行分隔:
  • topic_name:要从其加载数据的 Kafka 主题的名称。您可以从同一流参数中的不同 Kafka 主题进行读取,但存在一些限制。有关详细信息,请参阅从同一流参数中的多个主题进行加载

  • partition:要复制的 Kafka 主题中的分区。

  • start_offset:Kafka 主题中加载开始处的偏移量。此偏移量包含在内(偏移量为 start_offset 的消息也会加载)。有关其他选项,请参阅下面的特殊起始偏移量值

  • end_offset:加载应结束处的可选偏移量。此偏移量不包含在内(将不加载偏移量为 end_offset 的消息)。
    要使用 end_offset 结束加载,必须为 stream 参数中的所有分区提供结束偏移量值。如果尝试为某些分区设置结束偏移量而不为其他分区设置偏移量值,则会导致出错。
    如果不指定结束偏移量,则必须使用 stop_on_eofduration 提供至少一个其他结束条件。

brokers
Kafka 群集中代理的 host:port 对的逗号分隔列表。Vertica 建议在与 Vertica 不同的计算机上运行 Kafka。

默认值: localhost:9092

duration
指定时间范围持续时间的间隔。在此指定时间过后,KafkaSource 会终止 COPY 语句。如果未设置此参数,则必须使用 stop_on_eof 来设置至少一个其他结束条件,或者改为指定结束偏移量。有关详细信息,请参阅下面的持续时间说明
executionparallelism
加载数据时要使用的线程数。通常,将此参数设置为介于 1 和节点要从其加载的分区数之间的整数值。将此参数设置为更小的值可限制用于处理任何 COPY 语句的线程数。此外,还会增加池中发出的短查询的吞吐量,特别是在并发执行多个查询时更是如此。

如果未指定此参数,Vertica 会自动为每个分区创建一个线程,直至到达资源池允许的限值。

如果为 KafkaSource 指定的值小于为调度程序资源池指定的值,则使用 KafkaSource 值。此值不能超过为调度程序的资源池指定的值。

stop_on_eof
确定 KafkaSource 在 COPY 语句到达文件结尾后是否终止该语句。如果未设置此值,则必须使用 duration 或通过提供结束偏移量来设置至少一个其他结束条件。

默认值: FALSE

group_id

Kafka 使用者组(Vertica 将向其报告消费消息进度)的名称。设置此值以禁用向 Kafka 使用者组报告进度。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况

默认值: vertica_database-name

kafka_conf

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

fail_on_conf_parse_error

确定 kafka_conf 包含格式不正确的选项和值或者无效的配置属性时,该函数是否失败。

默认值:FALSE

有关接受的选项和值格式,请参阅直接设置 Kafka 库选项

有关有效配置属性的列表,请参阅 rdkafka GitHub 存储库

stream 参数的 start_offset 部分允许您从主题分区中的特定点开始加载消息。它还接受两个特殊偏移值之一:

  • -2 告知Set Snippet Variable Value in Topic从主题分区中最早可用的消息开始加载。当您需要从 Kafka 主题的分区中尽量加载更多消息时,此值十分有用。

  • -3 告知Set Snippet Variable Value in Topic从使用者组保存的偏移量开始加载。如果使用者组没有保存的偏移量,它会从主题分区中最早可用的消息开始加载。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况

从同一流参数中的多个主题进行加载

只要遵循以下准则,便可从同一流参数中的多个 Kafka 主题进行加载:

  • 主题的数据必须采用相同的格式,因为要将数据从 KafkaSource 传递到同一解析器。例如,不能从一个采用 Avro 格式的主题和另一个采用 JSON 格式的主题加载数据。

  • 同样,如果要加载 Avro 数据并从注册表中指定外部架构,则需要小心谨慎。Avro 解析器在每次加载数据时接受一个架构。如果不同主题中的数据具有不同的架构,则其中一个主题中的所有数据都将被拒绝。

  • 不同主题中的数据应具有相同(或非常类似)的架构,尤其是在将数据加载到传统 Vertica 表中时更是如此。虽然可以将具有不同架构的数据加载到 Flex 表中,但只有在少数情况下,将不同的数据合并到一个表中才有意义。

持续时间说明

duration 参数适用于 Vertica 允许 KafkaSource 函数运行的时间长度。它通常反映整个加载语句所花费的时间。但是,如果 KafkaSource 要加载大量数据或数据需要大量处理和解析,则查询的整体运行时间可能会超过在 duration 中指定的时间量。

示例

以下示例演示了如何调用 KafkaSource 以使用以下选项将数据从 Kafka 加载到名为 web_table 的现有 Flex 表中:

  • 名为 web_hits 的流只有一个分区。

  • 加载将从流中的最早消息开始(通过传递 -2 作为起始偏移量来标识)。

  • 加载在到达偏移量为 1000 的消息时结束。

  • Kafka 群集的代理是 example.com 域中的 kafka01 和 kafka03。

  • 这些代理正在侦听端口 9092。

  • 如果在到达偏移量为 1000 的消息之前到达流的结尾,加载将结束。如果不提供此选项,连接器将等到 Kafka 发送偏移量为 1000 的消息为止。

  • 已加载的数据将发送到 KafkaJSONParser 进行处理。

=> COPY web_table
   SOURCE KafkaSource(stream='web_hits|0|-2|1000',
                      brokers='kafka01.example.com:9092,kafka03.example.com:9092',
                      stop_on_eof=true)
   PARSER KafkaJSONParser();
 Rows Loaded
-------------
        1000
(1 row)

要查看有关此加载操作的详细信息,请查询 KafkaOffsets。KafkaOffsets 会返回 Vertica 在最近一次调用 KafkaSource 期间从 Kafka 使用的消息的相关元数据:

=> SELECT KafkaOffsets() OVER();
  ktopic  | kpartition | start_offset | end_offset | msg_count | bytes_read |    duration     |   ending   |      end_msg
----------+------------+--------------+------------+-----------+------------+-----------------+------------+-------------------
 web_hits |          0 |            0 |        999 |      1000 |     197027 | 00:00:00.385365 | END_OFFSET | Last message read
(1 row)

msg_count 列用于验证 Vertica 是否已加载 1000 条消息,ending 列指示 Vertica 在到达偏移量为 1000 的消息时已停止使用消息。

10 - KafkaTopicDetails

从一个或多个 Kafka 代理检索有关指定主题的信息。此函数将列出有关主题分区的详细信息,以及为 Kafka 群集中的每个分区提供服务的 Kafka 代理。

语法

KafkaTopicDetails(USING PARAMETERS brokers='hostname:port[,hostname2:port2...]'
                                  , topic=topic_name
                                 [, kafka_conf='option=value[;option2=value2...]']
                                 [, timeout=timeout_sec])
brokers
Kafka 群集中代理的主机名(或 IP 地址)。
port
运行代理的端口号。
topic
您需要获取其详细信息的 Kafka 主题。
kafka_conf
要直接传递给 rdkafka 库的 option=value 对的分号分隔列表。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项
timeout
等待 Kafka 群集做出响应的整数秒数。

示例

=> SELECT KafkaTopicDetails(USING PARAMETERS brokers='kafka1-01.example.com:9092',topic='iot_data') OVER();
 partition_id | lead_broker | replica_brokers | in_sync_replica_brokers
--------------+-------------+-----------------+-------------------------
            0 |           0 | 0               | 0
            1 |           1 | 1               | 1
            2 |           0 | 0               | 0
            3 |           1 | 1               | 1
            4 |           0 | 0               | 0
(5 rows)