KafkaExport
将 Vertica 数据发送到 Kafka。
如果 Vertica 成功将所有数据行导出到 Kafka,则此函数将返回零行。您可以使用此函数的输出将失败消息复制到辅助表,以备评估和重新处理。
语法
SELECT KafkaExport(partitionColumn, keyColumn, valueColumn
USING PARAMETERS brokers='host[:port][,host...]',
topic='topicname'
[,kafka_conf='kafka_configuration_setting']
[,fail_on_conf_parse_error=Boolean])
OVER (partition_clause) FROM table;
参数
- partitionColumn
- 要导出的目标分区。如果将此值设为 NULL,Vertica 将使用默认分区架构。您可以使用 partition 实参将消息发送至映射到 Vertica 分段的分区。
- keyColumn
- 与 valueColumn 关联的用户定义的键值。使用 NULL 跳过此实参。
- valueColumn
- 消息自身。该列是 LONG VARCHAR,允许向 Kafka 最多发送 32MB 数据。但是,Kafka 可能会对消息大小施加自己的限制。
brokers
- 包含 Kafka 群集中代理的一个或多个主机名或 IP 地址(含可选端口号)的逗号分隔列表的字符串。
topic
- 要导出到的 Kafka 主题。
kafka_conf
采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项。
fail_on_conf_parse_error
确定
kafka_conf
包含格式不正确的选项和值或者无效的配置属性时,该函数是否失败。默认值:FALSE
有关接受的选项和值格式,请参阅直接设置 Kafka 库选项。
有关有效配置属性的列表,请参阅 rdkafka GitHub 存储库。
示例
以下示例会将 iot_report 表中的每一行转换为 JSON 对象,然后将其导出到名为 iot-report 的 Kafka 主题。
iot_report 表包含以下列和数据:
=> SELECT * FROM iot_report;
server | date | location | id
--------+---------------------+-------------------------+----------
1 | 2016-10-11 04:09:28 | -14.86058, 112.75848 | 70982027
1 | 2017-07-02 12:37:48 | -21.42197, -127.17672 | 49494918
2 | 2017-01-20 03:05:24 | 37.17372, 136.14026 | 36670100
2 | 2017-07-29 11:38:37 | -38.99517, 171.72671 | 52049116
1 | 2017-10-19 14:04:33 | -71.72156, -36.27381 | 94328189
(5 rows)
要构建 KafkaExport 函数,请提供以下值以定义 Kafka 消息:
-
partitionColumn:使用
server
列。由于server
列值从 1 开始,而 Kafka 分区从 0 开始,因此需从server
值中减去 1。 -
keyColumn:使用
id
列。此列要求您将id
值显式转换为 VARCHAR 类型。 -
valueColumn:每条消息都会在已导出的 JSON 对象中将 date 和 location 列格式化为键/值对。
要将这些行转换为 JSON 格式,请使用 ROW 函数将 date 和 location 列转换为结构化数据。然后,将 ROW 函数传递给 TO_JSON 函数,以将数据结构编码为 JSON 对象。
填写所需的剩余函数实参并执行 KafkaExport 函数。如果成功,该函数将返回 0 行:
=> SELECT KafkaExport(server - 1, id::VARCHAR, TO_JSON(ROW(date, location))
USING PARAMETERS brokers='broker01:9092,broker02:9092',
topic='iot-results')
OVER (PARTITION BEST)
FROM iot_report;
partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)
使用 kafkacat 验证使用者是否在 iot-results 主题中包含 JSON 格式的数据:
$ /opt/vertica/packages/kafka/bin/kafkacat -C -t iot-results -b broker01:9092,broker02:9092
{"date":"2017-01-20 03:05:24","location":" 37.17372, 136.14026 "}
{"date":"2017-07-29 11:38:37","location":" -38.99517, 171.72671 "}
{"date":"2016-10-11 04:09:28","location":" -14.86058, 112.75848 "}
{"date":"2017-10-19 14:04:33","location":" -71.72156, -36.27381 "}
{"date":"2017-07-02 12:37:48","location":" -21.42197, -127.17672 "}