使用 KafkaExport 生成数据

使用 KafkaExport 函数可以将数据从 Vertica 流式传输到 Kafka。您可以向此函数传递三个实参以及两个或三个参数:

SELECT KafkaExport(partitionColumn, keyColumn, valueColumn
    USING PARAMETERS brokers='host[:port][,host...]',
    topic='topicname'
    [,kafka_conf='kafka_configuration_setting']
    [,fail_on_conf_parse_error=Boolean])
OVER (partition_clause) FROM table;

partitionColumnkeyColumn 实参分别用于设置 Kafka 主题的分区和键值。您可以将这两个值中的任何一个或两个都设为 NULL。如果将分区设为 NULL,Kafka 将使用其默认分区架构(如果键值为 NULL,则随机分配分区,否则根据键值进行分配)。

valueColumn 实参是 LONG VARCHAR,其中包含要发送到 Kafka 的消息数据。Kafka 不会对消息内容强加结构。唯一能对消息格式添加的限制是指定数据使用者能够解析的内容。

您可以按照自己喜欢的任何方式自由地将数据转换为字符串。对于简单消息(如逗号分隔的列表),可以使用 CONCAT 等函数将各个值组合为消息。如果需要更复杂的数据格式(如 JSON),请考虑编写 UDx 函数,该函数接受数据列并以您所需的格式输出包含数据的 LONG VARCHAR。有关详细信息,请参阅开发用户定义的扩展 (UDx)

有关 KafkaExport 语法的详细信息,请参阅 KafkaExport

导出示例

以下示例为您显示了如何对表的多个列执行简单导出。假设您拥有以下包含一组简单的物联网 (IOT) 数据的表:

=> SELECT * FROM iot_report LIMIT 10;
 server |        date         |       location        |    id
--------+---------------------+-----------------------+----------
      1 | 2016-10-11 04:09:28 | -14.86058, 112.75848  | 70982027
      1 | 2017-07-02 12:37:48 | -21.42197, -127.17672 | 49494918
      1 | 2017-10-19 14:04:33 | -71.72156, -36.27381  | 94328189
      1 | 2018-07-11 19:35:18 | -9.41315, 102.36866   | 48366610
      1 | 2018-08-30 08:09:45 | 83.97962, 162.83848   |   967212
      2 | 2017-01-20 03:05:24 | 37.17372, 136.14026   | 36670100
      2 | 2017-07-29 11:38:37 | -38.99517, 171.72671  | 52049116
      2 | 2018-04-19 13:06:58 | 69.28989, 133.98275   | 36059026
      2 | 2018-08-28 01:09:51 | -59.71784, -144.97142 | 77310139
      2 | 2018-09-14 23:16:15 | 58.07275, 111.07354   |  4198109
(10 rows)

=> \d iot_report
                                       List of Fields by Tables
 Schema |   Table    |  Column  |    Type     | Size | Default | Not Null | Primary Key | Foreign Key
--------+------------+----------+-------------+------+---------+----------+-------------+-------------
 public | iot_report | server   | int         |    8 |         | f        | f           |
 public | iot_report | date     | timestamp   |    8 |         | f        | f           |
 public | iot_report | location | varchar(40) |   40 |         | f        | f           |
 public | iot_report | id       | int         |    8 |         | f        | f           |
(4 rows)

您希望将此表中的数据发送到名为 iot_results 的 Kafka 主题以供其他应用程序使用。查看相应数据和 iot_report 的结构后,您可能会做出以下决定:

  • server 列与 iot_report 中的分区非常匹配。Kafka 主题中有三个分区,server 列中的值介于 1 和 3 之间。假设分区列的值范围更大(例如,介于 1 和 100 之间)。那么您就可以使用取模运算符 (%) 将这些值强制转换为与分区数相同的范围 (server % 3)。
    这些值的复杂之处在于 server 列中的值从 1 开始(该列中的最小值为 1)。Kafka 的分区编号架构从零开始。因此,调整 server 列中的值时必须减去 1。

  • id 列可以充当键。此列的数据类型为 INTEGER。KafkaExport 函数的键值应为 VARCHAR。Vertica 不会自动将 INTEGER 值转换为 VARCHAR,因此您必须在函数调用中显式转换该值。

  • iot_report 主题的使用者希望值采用逗号分隔格式。您可以使用对 CONCAT 函数的嵌套调用将 date 和 location 列中的值合并到一个 VARCHAR 中。

您需要了解的最后一条信息是 Kafka 群集中代理的主机名和端口号。在此示例中,有两个名为 kafka01 和 kafka03 的代理在端口 6667(Hortonworks 群集使用的端口)上运行。获取所有这些信息后,即可随时导出数据。

以下示例显示了如何导出 iot_report 的内容:

=> SELECT KafkaExport(server - 1, id::VARCHAR,
   CONCAT(CONCAT(date, ', '), location)
   USING PARAMETERS brokers='kafka01:6667,kafka03:6667',
   topic='iot_results') OVER (PARTITION BEST) FROM iot_report;
 partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)

KafkaExport 返回了 0 行,这意味着 Vertica 能够将您的所有数据发送到 Kafka,而不会出现任何错误。

关于此示例的其他注意事项:

  • CONCAT 函数会自动为您将 data 列的 DATETIME 值转换为 VARCHAR,以便您无需对其进行显式转换。

  • 需要使用两个嵌套的 CONCAT 函数将 date 字段与逗号连接起来,然后将生成的字符串与 location 字段连接起来。

  • 向 message 字段添加第三列需要再进行两次 CONCAT 函数调用(一次用于在 location 列后连接逗号,一次用于连接其他列的值)。在加载相当于几列的数据之后,使用 CONCAT 将变得棘手。

在 Kafka 端,您将看到作为 KafkaExport 函数的 valueColumn(第三个)实参发送的任何内容。在上面的示例中,此内容为 CSV 列表。如果在运行示例查询之前已为 iot_results 主题启动控制台使用者,您将在查询运行时看到以下输出:

$ /opt/kafka/bin/kafka-console-consumer.sh --topic iot_results --zookeeper localhost
2017-10-10 12:08:33, 78.84883, -137.56584
2017-12-06 16:50:57, -25.33024, -157.91389
2018-01-12 21:27:39, 82.34027, 116.66703
2018-08-19 00:02:18, 13.00436, 85.44815
2016-10-11 04:09:28, -14.86058, 112.75848
2017-07-02 12:37:48, -21.42197, -127.17672
2017-10-19 14:04:33, -71.72156, -36.27381
2018-07-11 19:35:18, -9.41315, 102.36866
2018-08-30 08:09:45, 83.97962, 162.83848
2017-01-20 03:05:24, 37.17372, 136.14026
2017-07-29 11:38:37, -38.99517, 171.72671
2018-04-19 13:06:58, 69.28989, 133.98275
2018-08-28 01:09:51, -59.71784, -144.97142
2018-09-14 23:16:15, 58.07275, 111.07354

KafkaExport 的返回值

KafkaExport 将输出 Kafka 已拒绝的所有行。例如,假设您在前面的示例中忘记将分区列调整为从零开始。则导出到 Kafka 的某些行会指定一个不存在的分区。在这种情况下,Kafka 会拒绝这些行,并且 KafkaExport 会以表格式报告它们:

=> SELECT KafkaExport(server, id::VARCHAR,
   CONCAT(CONCAT(date, ', '), location)
   USING PARAMETERS brokers='kafka01:6667,kafka03:6667',
   topic='iot_results') OVER (PARTITION BEST) FROM iot_report;
 partition |   key    |                  message                    |      failure_reason
-----------+----------+---------------------------------------------+--------------------------
         3 | 40492866 | 2017-10-10 12:08:33, 78.84883, -137.56584,  | Local: Unknown partition
         3 | 73846006 | 2017-12-06 16:50:57, -25.33024, -157.91389, | Local: Unknown partition
         3 | 45020829 | 2018-01-12 21:27:39, 82.34027, 116.66703,   | Local: Unknown partition
         3 | 27462612 | 2018-08-19 00:02:18, 13.00436, 85.44815,    | Local: Unknown partition
(4 rows)

您可以通过创建表保存拒绝的行来捕获此输出。然后,使用 INSERT 语句插入 KafkaExport 的结果:

=> CREATE TABLE export_rejects (partition INTEGER, key VARCHAR, message LONG VARCHAR, failure_reason VARCHAR);
CREATE TABLE
=> INSERT INTO export_rejects SELECT KafkaExport(server, id::VARCHAR,
   CONCAT(CONCAT(date, ', '), location)
   USING PARAMETERS brokers='kafka01:6667,kafka03:6667',
   topic='iot_results') OVER (PARTITION BEST) FROM iot_report;
 OUTPUT
--------
      4
(1 row)
=> SELECT * FROM export_rejects;
 partition |   key    |                  message                   |      failure_reason
-----------+----------+--------------------------------------------+--------------------------
         3 | 27462612 | 2018-08-19 00:02:18, 13.00436, 85.44815    | Local: Unknown partition
         3 | 40492866 | 2017-10-10 12:08:33, 78.84883, -137.56584  | Local: Unknown partition
         3 | 73846006 | 2017-12-06 16:50:57, -25.33024, -157.91389 | Local: Unknown partition
         3 | 45020829 | 2018-01-12 21:27:39, 82.34027, 116.66703   | Local: Unknown partition
(4 rows)