KafkaExport
Sends Vertica data to Kafka.
If Vertica successfully exports all of the rows of data to Kafka, this function returns zero rows. You can use the output of this function to copy failed messages to a secondary table for evaluation and reprocessing.
Syntax
SELECT KafkaExport(partitionColumn, keyColumn, valueColumn
USING PARAMETERS brokers='host[:port][,host...]',
topic='topicname'
[,kafka_conf='kafka_configuration_setting']
[,kafka_topic_conf='kafka_configuration_setting']
[,kafka_conf_secret='kafka_configuration_setting']
[,fail_on_conf_parse_error=Boolean])
OVER (partition_clause) FROM table;
Parameters
partitionColumn
- The target partition for the export. If you set this value to NULL, Vertica uses the default partitioning scheme. You can use the partition argument to send messages to partitions that map to Vertica segments.
keyColumn
- The user defined key value associated with the valueColumn. Use NULL to skip this argument.
valueColumn
- The message itself. The column is a LONG VARCHAR, allowing you to send up to 32MB of data to Kafka. However, Kafka may impose its own limits on message size.
brokers
- A string containing a comma-separated list of one or more host names or IP addresses (with optional port number) of brokers in the Kafka cluster.
topic
- The Kafka topic to which you are exporting.
kafka_conf
A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.
For details, see Directly setting Kafka library options.
kafka_topic_conf
A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets topic-level configuration properties that are not available through the Vertica integration with Kafka.
For details, see Directly setting Kafka library options.
kafka_conf_secret
Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as
kafka_conf
.Values passed to this parameter are not logged or stored in system tables.
fail_on_conf_parse_error
Determines whether the function fails when
kafka_conf
contains incorrectly formatted options and values, or invalid configuration properties.Default Value: FALSE
For accepted option and value formats, see Directly setting Kafka library options.
For a list of valid configuration properties, see the rdkafka GitHub repository.
Example
The following example converts each row from the iot_report table into a JSON object, and exports it to a Kafka topic named iot-report.
The iot_report table contains the following columns and data:
=> SELECT * FROM iot_report;
server | date | location | id
--------+---------------------+-------------------------+----------
1 | 2016-10-11 04:09:28 | -14.86058, 112.75848 | 70982027
1 | 2017-07-02 12:37:48 | -21.42197, -127.17672 | 49494918
2 | 2017-01-20 03:05:24 | 37.17372, 136.14026 | 36670100
2 | 2017-07-29 11:38:37 | -38.99517, 171.72671 | 52049116
1 | 2017-10-19 14:04:33 | -71.72156, -36.27381 | 94328189
(5 rows)
To build the KafkaExport function, provide the following values to define the Kafka message:
-
partitionColumn
: Use theserver
column. Because theserver
column values are one-based and Kafka partitions are zero-based, subtract 1 from theserver
value. -
keyColumn
: Use theid
column. This requires that you explicitly cast theid
value to a VARCHAR type. -
valueColumn
: Each message formats the date and location columns as the key/value pairs in the exported JSON object.To convert these rows to JSON format, use the ROW function to convert the date and location columns to structured data. Then, pass the ROW function to the TO_JSON function to encode the data structure as a JSON object.
Complete the remaining required function arguments and execute the KafkaExport function. If it succeeds, it returns 0 rows:
=> SELECT KafkaExport(server - 1, id::VARCHAR, TO_JSON(ROW(date, location))
USING PARAMETERS brokers='broker01:9092,broker02:9092',
topic='iot-results')
OVER (PARTITION BEST)
FROM iot_report;
partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)
Use kafkacat to verify that the consumer contains the JSON-formatted data in the iot-results topic:
$ /opt/vertica/packages/kafka/bin/kafkacat -C -t iot-results -b broker01:9092,broker02:9092
{"date":"2017-01-20 03:05:24","location":" 37.17372, 136.14026 "}
{"date":"2017-07-29 11:38:37","location":" -38.99517, 171.72671 "}
{"date":"2016-10-11 04:09:28","location":" -14.86058, 112.75848 "}
{"date":"2017-10-19 14:04:33","location":" -71.72156, -36.27381 "}
{"date":"2017-07-02 12:37:48","location":" -21.42197, -127.17672 "}