KafkaExport

Sends Vertica data to Kafka.

Sends Vertica data to Kafka.

If Vertica successfully exports all of the rows of data to Kafka, this function returns zero rows. You can use the output of this function to copy failed messages to a secondary table for evaluation and reprocessing.

Syntax

SELECT KafkaExport(partitionColumn, keyColumn, valueColumn
    USING PARAMETERS brokers='host[:port][,host...]',
    topic='topicname'
    [,kafka_conf='kafka_configuration_setting']
    [,kafka_topic_conf='kafka_configuration_setting']
    [,kafka_conf_secret='kafka_configuration_setting']
    [,fail_on_conf_parse_error=Boolean])
OVER (partition_clause) FROM table;

Parameters

partitionColumn
The target partition for the export. If you set this value to NULL, Vertica uses the default partitioning scheme. You can use the partition argument to send messages to partitions that map to Vertica segments.
keyColumn
The user defined key value associated with the valueColumn. Use NULL to skip this argument.
valueColumn
The message itself. The column is a LONG VARCHAR, allowing you to send up to 32MB of data to Kafka. However, Kafka may impose its own limits on message size.
brokers
A string containing a comma-separated list of one or more host names or IP addresses (with optional port number) of brokers in the Kafka cluster.
topic
The Kafka topic to which you are exporting.
kafka_conf

A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.

For details, see Directly setting Kafka library options.

kafka_topic_conf

A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets topic-level configuration properties that are not available through the Vertica integration with Kafka.

For details, see Directly setting Kafka library options.

kafka_conf_secret

Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as kafka_conf.

Values passed to this parameter are not logged or stored in system tables.

fail_on_conf_parse_error

Determines whether the function fails when kafka_conf contains incorrectly formatted options and values, or invalid configuration properties.

Default Value: FALSE

For accepted option and value formats, see Directly setting Kafka library options.

For a list of valid configuration properties, see the rdkafka GitHub repository.

Example

The following example converts each row from the iot_report table into a JSON object, and exports it to a Kafka topic named iot-report.

The iot_report table contains the following columns and data:

=> SELECT * FROM iot_report;
 server |        date         |        location         |    id
--------+---------------------+-------------------------+----------
      1 | 2016-10-11 04:09:28 |  -14.86058, 112.75848   | 70982027
      1 | 2017-07-02 12:37:48 |  -21.42197, -127.17672  | 49494918
      2 | 2017-01-20 03:05:24 |  37.17372, 136.14026    | 36670100
      2 | 2017-07-29 11:38:37 |  -38.99517, 171.72671   | 52049116
      1 | 2017-10-19 14:04:33 |  -71.72156, -36.27381   | 94328189
(5 rows)

To build the KafkaExport function, provide the following values to define the Kafka message:

  • partitionColumn: Use the server column. Because the server column values are one-based and Kafka partitions are zero-based, subtract 1 from the server value.

  • keyColumn: Use the id column. This requires that you explicitly cast the id value to a VARCHAR type.

  • valueColumn: Each message formats the date and location columns as the key/value pairs in the exported JSON object.

    To convert these rows to JSON format, use the ROW function to convert the date and location columns to structured data. Then, pass the ROW function to the TO_JSON function to encode the data structure as a JSON object.

Complete the remaining required function arguments and execute the KafkaExport function. If it succeeds, it returns 0 rows:

=> SELECT KafkaExport(server - 1, id::VARCHAR, TO_JSON(ROW(date, location))
                      USING PARAMETERS brokers='broker01:9092,broker02:9092',
                      topic='iot-results')
   OVER (PARTITION BEST)
   FROM iot_report;
 partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)

Use kafkacat to verify that the consumer contains the JSON-formatted data in the iot-results topic:

$ /opt/vertica/packages/kafka/bin/kafkacat -C -t iot-results -b broker01:9092,broker02:9092
{"date":"2017-01-20 03:05:24","location":" 37.17372, 136.14026   "}
{"date":"2017-07-29 11:38:37","location":" -38.99517, 171.72671  "}
{"date":"2016-10-11 04:09:28","location":" -14.86058, 112.75848  "}
{"date":"2017-10-19 14:04:33","location":" -71.72156, -36.27381  "}
{"date":"2017-07-02 12:37:48","location":" -21.42197, -127.17672 "}

See also

Producing data using KafkaExport