KafkaExport

将 Vertica 数据发送到 Kafka。

如果 Vertica 成功将所有数据行导出到 Kafka,则此函数将返回零行。您可以使用此函数的输出将失败消息复制到辅助表,以备评估和重新处理。

语法

SELECT KafkaExport(partitionColumn, keyColumn, valueColumn
    USING PARAMETERS brokers='host[:port][,host...]',
    topic='topicname'
    [,kafka_conf='kafka_configuration_setting']
    [,fail_on_conf_parse_error=Boolean])
OVER (partition_clause) FROM table;

参数

partitionColumn
要导出的目标分区。如果将此值设为 NULL,Vertica 将使用默认分区架构。您可以使用 partition 实参将消息发送至映射到 Vertica 分段的分区。
keyColumn
与 valueColumn 关联的用户定义的键值。使用 NULL 跳过此实参。
valueColumn
消息自身。该列是 LONG VARCHAR,允许向 Kafka 最多发送 32MB 数据。但是,Kafka 可能会对消息大小施加自己的限制。
brokers
包含 Kafka 群集中代理的一个或多个主机名或 IP 地址(含可选端口号)的逗号分隔列表的字符串。
topic
要导出到的 Kafka 主题。
kafka_conf

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

fail_on_conf_parse_error

确定 kafka_conf 包含格式不正确的选项和值或者无效的配置属性时,该函数是否失败。

默认值:FALSE

有关接受的选项和值格式,请参阅直接设置 Kafka 库选项

有关有效配置属性的列表,请参阅 rdkafka GitHub 存储库

示例

以下示例会将 iot_report 表中的每一行转换为 JSON 对象,然后将其导出到名为 iot-report 的 Kafka 主题。

iot_report 表包含以下列和数据:

=> SELECT * FROM iot_report;
 server |        date         |        location         |    id
--------+---------------------+-------------------------+----------
      1 | 2016-10-11 04:09:28 |  -14.86058, 112.75848   | 70982027
      1 | 2017-07-02 12:37:48 |  -21.42197, -127.17672  | 49494918
      2 | 2017-01-20 03:05:24 |  37.17372, 136.14026    | 36670100
      2 | 2017-07-29 11:38:37 |  -38.99517, 171.72671   | 52049116
      1 | 2017-10-19 14:04:33 |  -71.72156, -36.27381   | 94328189
(5 rows)

要构建 KafkaExport 函数,请提供以下值以定义 Kafka 消息:

  • partitionColumn:使用 server 列。由于 server 列值从 1 开始,而 Kafka 分区从 0 开始,因此需从 server 值中减去 1。

  • keyColumn:使用 id 列。此列要求您将 id显式转换为 VARCHAR 类型。

  • valueColumn:每条消息都会在已导出的 JSON 对象中将 date 和 location 列格式化为键/值对。

    要将这些行转换为 JSON 格式,请使用 ROW 函数将 date 和 location 列转换为结构化数据。然后,将 ROW 函数传递给 TO_JSON 函数,以将数据结构编码为 JSON 对象。

填写所需的剩余函数实参并执行 KafkaExport 函数。如果成功,该函数将返回 0 行:

=> SELECT KafkaExport(server - 1, id::VARCHAR, TO_JSON(ROW(date, location))
                      USING PARAMETERS brokers='broker01:9092,broker02:9092',
                      topic='iot-results')
   OVER (PARTITION BEST)
   FROM iot_report;
 partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)

使用 kafkacat 验证使用者是否在 iot-results 主题中包含 JSON 格式的数据:

$ /opt/vertica/packages/kafka/bin/kafkacat -C -t iot-results -b broker01:9092,broker02:9092
{"date":"2017-01-20 03:05:24","location":" 37.17372, 136.14026   "}
{"date":"2017-07-29 11:38:37","location":" -38.99517, 171.72671  "}
{"date":"2016-10-11 04:09:28","location":" -14.86058, 112.75848  "}
{"date":"2017-10-19 14:04:33","location":" -71.72156, -36.27381  "}
{"date":"2017-07-02 12:37:48","location":" -21.42197, -127.17672 "}

另请参阅

使用 KafkaExport 生成数据