This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Producing data for Kafka

In addition to consuming data from Kafka, Vertica can produce data for Kafka.

In addition to consuming data from Kafka, Vertica can produce data for Kafka. Stream the following data from Vertica for consumption by other Kafka consumers:

  • Vertica anayltics results. Use KafkaExport to export Vertica tables and queries.

  • Health and performance data from Data Collector tables. Create push-based notifiers to send this data for consumption for third-party monitoring tools.

  • Ad hoc messages. Use NOTIFY to signal that tasks such as stored procedures are complete.

1 - Producing data using KafkaExport

The KafkaExport function lets you stream data from Vertica to Kafka.

The KafkaExport function lets you stream data from Vertica to Kafka. You pass this function three arguments and two or three parameters:

SELECT KafkaExport(partitionColumn, keyColumn, valueColumn
    USING PARAMETERS brokers='host[:port][,host...]',
    topic='topicname'
    [,kafka_conf='kafka_configuration_setting']
    [,fail_on_conf_parse_error=Boolean])
OVER (partition_clause) FROM table;

The partitionColumn and keyColumn arguments set the Kafka topic's partition and key value, respectively. You can set either or both of these values to NULL. If you set the partition to NULL, Kafka uses its default partitioning scheme (either randomly assigning partitions if the key value is NULL, or based on the key value if it is not).

The valueColumn argument is a LONG VARCHAR containing message data that you want to send to Kafka. Kafka does not impose structure on the message content. Your only restriction on the message format is what the consumers of the data are able to parse.

You are free to convert your data into a string in any way you like. For simple messages (such as a comma-separated list), you can use functions such as CONCAT to assemble your values into a message. If you need a more complex data format, such as JSON, consider writing a UDx function that accepts columns of data and outputs a LONG VARCHAR containing the data in the format you require. See Developing user-defined extensions (UDxs) for more information.

See KafkaExport for detailed information about KafkaExport's syntax.

Export example

This example shows you how to perform a simple export of several columns of a table. Suppose you have the following table containing a simple set of Internet of things (IOT) data:

=> SELECT * FROM iot_report LIMIT 10;
 server |        date         |       location        |    id
--------+---------------------+-----------------------+----------
      1 | 2016-10-11 04:09:28 | -14.86058, 112.75848  | 70982027
      1 | 2017-07-02 12:37:48 | -21.42197, -127.17672 | 49494918
      1 | 2017-10-19 14:04:33 | -71.72156, -36.27381  | 94328189
      1 | 2018-07-11 19:35:18 | -9.41315, 102.36866   | 48366610
      1 | 2018-08-30 08:09:45 | 83.97962, 162.83848   |   967212
      2 | 2017-01-20 03:05:24 | 37.17372, 136.14026   | 36670100
      2 | 2017-07-29 11:38:37 | -38.99517, 171.72671  | 52049116
      2 | 2018-04-19 13:06:58 | 69.28989, 133.98275   | 36059026
      2 | 2018-08-28 01:09:51 | -59.71784, -144.97142 | 77310139
      2 | 2018-09-14 23:16:15 | 58.07275, 111.07354   |  4198109
(10 rows)

=> \d iot_report
                                       List of Fields by Tables
 Schema |   Table    |  Column  |    Type     | Size | Default | Not Null | Primary Key | Foreign Key
--------+------------+----------+-------------+------+---------+----------+-------------+-------------
 public | iot_report | server   | int         |    8 |         | f        | f           |
 public | iot_report | date     | timestamp   |    8 |         | f        | f           |
 public | iot_report | location | varchar(40) |   40 |         | f        | f           |
 public | iot_report | id       | int         |    8 |         | f        | f           |
(4 rows)

You want to send the data in this table to a Kafka topic named iot_results for consumption by other applications. Looking at the data and the structure of the iot_report, you may decide the following:

  • The server column is a good match for the partitions in iot_report. There are three partitions in the Kafka topic, and the values in server column are between 1 and 3. Suppose the partition column had a larger range of values (for example, between 1 and 100). Then you could use the modulo operator (%) to coerce the values into the same range as the number of partitions (server % 3).
    A complication with these values is that the values in the server are 1-based (the lowest value in the column is 1). Kafka's partition numbering scheme is zero-based. So, you must adjust the values in the server column by subtracting 1 from them.

  • The id column can act as the key. This column has a data type of INTEGER. The KafkaExport function expects the key value to be a VARCHAR. Vertica does not automatically cast INTEGER values to VARCHAR, so you must explicitly cast the value in your function call.

  • The consumers of the iot_report topic expect values in comma-separated format. You can combine the values from the date and location columns into a single VARCHAR using nested calls to the CONCAT function.

The final piece of information you need to know is the host names and port numbers of the brokers in your Kafka cluster. In this example, there are two brokers named kafka01 and kafka03, running on port 6667 (the port that Hortonworks clusters use). Once you have all of this information, you are ready to export your data.

The following example shows how you might export the contents of iot_report:

=> SELECT KafkaExport(server - 1, id::VARCHAR,
   CONCAT(CONCAT(date, ', '), location)
   USING PARAMETERS brokers='kafka01:6667,kafka03:6667',
   topic='iot_results') OVER (PARTITION BEST) FROM iot_report;
 partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)

KafkaExport returned 0 rows which means Vertica was able to send all of your data to Kafka without any errors.

Other things to note about the example:

  • The CONCAT function automatically converts the date column's DATETIME value to a VARCHAR for you, so you do not need to explicitly cast it.

  • Two nested CONCAT functions are necessary to concatenate the date field with a comma, and the resulting string with the location field.

  • Adding a third column to the message field would require two additional CONCAT function calls (one to concatenate a comma after the location column, and one to concatenate the additional column's value). Using CONCAT becomes messy after just a few column's worth of data.

On the Kafka side, you will see whatever you sent as the valueColumn (third) argument of the KafkaExport function. In the above example, this is a CSV list. If you started a console consumer for iot_results topic before running the example query, you would see the following output when the query runs:

$ /opt/kafka/bin/kafka-console-consumer.sh --topic iot_results --zookeeper localhost
2017-10-10 12:08:33, 78.84883, -137.56584
2017-12-06 16:50:57, -25.33024, -157.91389
2018-01-12 21:27:39, 82.34027, 116.66703
2018-08-19 00:02:18, 13.00436, 85.44815
2016-10-11 04:09:28, -14.86058, 112.75848
2017-07-02 12:37:48, -21.42197, -127.17672
2017-10-19 14:04:33, -71.72156, -36.27381
2018-07-11 19:35:18, -9.41315, 102.36866
2018-08-30 08:09:45, 83.97962, 162.83848
2017-01-20 03:05:24, 37.17372, 136.14026
2017-07-29 11:38:37, -38.99517, 171.72671
2018-04-19 13:06:58, 69.28989, 133.98275
2018-08-28 01:09:51, -59.71784, -144.97142
2018-09-14 23:16:15, 58.07275, 111.07354

KafkaExport's return value

KafkaExport outputs any rows that Kafka rejected. For example, suppose you forgot to adjust the partition column to be zero-based in the previous example. Then some of the rows exported to Kafka would specify a partition that does not exist. In this case, Kafka rejects these rows, and KafkaExport reports them in table format:

=> SELECT KafkaExport(server, id::VARCHAR,
   CONCAT(CONCAT(date, ', '), location)
   USING PARAMETERS brokers='kafka01:6667,kafka03:6667',
   topic='iot_results') OVER (PARTITION BEST) FROM iot_report;
 partition |   key    |                  message                    |      failure_reason
-----------+----------+---------------------------------------------+--------------------------
         3 | 40492866 | 2017-10-10 12:08:33, 78.84883, -137.56584,  | Local: Unknown partition
         3 | 73846006 | 2017-12-06 16:50:57, -25.33024, -157.91389, | Local: Unknown partition
         3 | 45020829 | 2018-01-12 21:27:39, 82.34027, 116.66703,   | Local: Unknown partition
         3 | 27462612 | 2018-08-19 00:02:18, 13.00436, 85.44815,    | Local: Unknown partition
(4 rows)

You can capture this output by creating a table to hold the rejects. Then use an INSERT statement to insert KafkaExport's results:

=> CREATE TABLE export_rejects (partition INTEGER, key VARCHAR, message LONG VARCHAR, failure_reason VARCHAR);
CREATE TABLE
=> INSERT INTO export_rejects SELECT KafkaExport(server, id::VARCHAR,
   CONCAT(CONCAT(date, ', '), location)
   USING PARAMETERS brokers='kafka01:6667,kafka03:6667',
   topic='iot_results') OVER (PARTITION BEST) FROM iot_report;
 OUTPUT
--------
      4
(1 row)
=> SELECT * FROM export_rejects;
 partition |   key    |                  message                   |      failure_reason
-----------+----------+--------------------------------------------+--------------------------
         3 | 27462612 | 2018-08-19 00:02:18, 13.00436, 85.44815    | Local: Unknown partition
         3 | 40492866 | 2017-10-10 12:08:33, 78.84883, -137.56584  | Local: Unknown partition
         3 | 73846006 | 2017-12-06 16:50:57, -25.33024, -157.91389 | Local: Unknown partition
         3 | 45020829 | 2018-01-12 21:27:39, 82.34027, 116.66703   | Local: Unknown partition
(4 rows)

2 - Producing Kafka messages using notifiers

You can use notifiers to help you monitor your Vertica database using third-party Kafka-aware tools by producing messages to a Kafka topic.

You can use notifiers to help you monitor your Vertica database using third-party Kafka-aware tools by producing messages to a Kafka topic. You can directly publish messages (for example, from a SQL script to indicate a long-running query has finished). Notifiers can also automatically send messages when a component in the data collector tables is updated.

2.1 - Creating a Kafka notifier

The following procedure creates a Kafka notifier.

The following procedure creates a Kafka notifier. At a minimum, a notifier defines:

  • A unique name.

  • A message protocol. This is kafka:// when sending messages to Kafka.

  • The server to communicate with. For Kafka, this is the address and port number of a Kafka broker.

  • The maximum message buffer size. If the queue of messages to be sent via the notifier exceed this limit, messages are dropped.

You create the notifier with CREATE NOTIFIER. This example creates a notifier named load_progress_notifier that sends messages via the Kafka broker running on kafka01.example.com on port 9092:

=> CREATE NOTIFIER load_progress_notifier
    ACTION 'kafka://kafka01.example.com:9092'
    MAXMEMORYSIZE '10M';

While not required, it's best practice to create notifiers that use an encrypted connection. The following example creates a notifier that uses an encrypted connection and verifies the Kafka server's certificate with the provided CA bundle:

=> CREATE NOTIFIER encrypted_notifier
    ACTION 'kafka://127.0.0.1:9092'
    MAXMEMORYSIZE '10M'
    TLSMODE 'verify-ca'
    CA BUNDLE ca_bundle;

Follow this procedure to create or alter notifiers for Kafka endpoints that use SASL_SSL. Note that you must repeat this procedure whenever you change the TLSMODE, certificates, or CA bundle for a given notifier.

  1. Use CREATE or ALTER to disable the notifier while setting the TLSMODE, certificate, and CA bundle.

    => ALTER NOTIFIER encrypted_notifier
        DISABLE
        TLSMODE 'verify-ca'
        CA BUNDLE ca_bundle2;
    
  2. ALTER the notifier and set the proper rdkafka adapter parameters for SASL_SSL.

    => ALTER NOTIFIER encrypted_notifier PARAMETERS
      'sasl.username=user;sasl.password=password;sasl.mechanism=PLAIN;security.protocol=SASL_SSL';
    
  3. Enable the notifier.

    => ALTER NOTIFIER encrypted_notifier ENABLE;
    

2.2 - Sending individual messages via a Kafka notifier

You can send an individual message via a Kafka notifier using the NOTIFY function.

You can send an individual message via a Kafka notifier using the NOTIFY function. This feature is useful for reporting the progress of SQL scripts such as ETL tasks to third-party reporting tools.

You pass this function three string values:

  • The message to send.

  • The name of the notifier to send the message.

  • The Kafka topic to receive the message.

For example, suppose you want to send the message "Daily load finished" to the vertica_notifications topic of the Kafka cluster defined in the load_progress_notifier notifier created earlier. Then you could execute the following statement:

=> SELECT NOTIFY('Daily load finished.',
                 'load_progress_notifier',
                 'vertica_notifications');
 NOTIFY
--------
 OK
(1 row)

The message the notifier sends to Kafka is in JSON format. You can see the resulting message by using the console consumer on a Kafka node. For example:

$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
                                           --from-beginning \
                                           --topic vertica_notifications \
                                           --max-messages 1

{"_db":"vmart","_schema":"v_internal","_table":"dc_notifications",
"channel":"vertica_notifications","message":"Daily load finished.",
"node_name":"v_vmart_node0001","notifier":"load_progress_notifier",
"request_id":2,"session_id":"v_vmart_node0001-463079:0x4ba6f",
"statement_id":-1,"time":"2018-06-19 09:48:42.314181-04",
"transaction_id":45035996275565458,"user_id":45035996273704962,
"user_name":"dbadmin"}

Processed a total of 1 messages

2.3 - Monitoring DC tables with Kafka notifiers

The Vertica (DC) tables monitor many different database functions.

The Vertica Data collector (DC) tables monitor many different database functions. You can have a notifier automatically send a message to a Kafka endpoint when a DC component updates. You can query the DATA_COLLECTOR table to get a list of the DC components.

You configure the notifier to send DC component updates to Kafka using the function SET_DATA_COLLECTOR_NOTIFY_POLICY.

To be notified of failed login attempts, you can create a notifier that sends a notification when the DC component LoginFailures updates. The TLSMODE 'verify-ca' verifies that the server's certificate is signed by a trusted CA.

=> CREATE NOTIFIER vertica_stats ACTION 'kafka://kafka01.example.com:9092' MAXMEMORYSIZE '10M' TLSMODE 'verify-ca';
CREATE NOTIFIER
=> SELECT SET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures','vertica_stats', 'vertica_notifications', true);
SET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------
 SET
(1 row)

Like the messages sent via the NOTIFY function, the data sent to Kafka from the DC components is in JSON format. The previous example results in messages like the following being sent to the vertica_notifications Kafka topic:

{"_db":"vmart","_schema":"v_internal","_table":"dc_login_failures",
"authentication_method":"Reject","client_authentication_name":"",
"client_hostname":"::1","client_label":"","client_os_user_name":"dbadmin",
"client_pid":481535,"client_version":"","database_name":"alice",
"effective_protocol":"3.8","node_name":"v_vmart_node0001",
"reason":"INVALID USER","requested_protocol":"3.8","ssl_client_fingerprint":"",
"time":"2018-06-19 14:51:22.437035-04","user_name":"alice"}

Viewing notification policies for a DC component

Use the GET_DATA_COLLECTOR_NOTIFY_POLICY function to list the policies set for a DC component.

=> SELECT GET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures');
                   GET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------------------------------------------
 Notifiable;  Notifier: vertica_stats; Channel: vertica_notifications
(1 row)

Disabling a notification policy

You can call SET_DATA_COLLECTOR_NOTIFY_POLICY function with its fourth argument set to FALSE to disable a notification policy. The following example disables the notify policy for the LoginFailures component:

=> SELECT SET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures','vertica_stats', 'vertica_notifications', false);
 SET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------
 SET
(1 row)

=> SELECT GET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures');
 GET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------
 Not notifiable;
(1 row)