This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Kafka function reference

This section lists the functions that make up Vertica's Kafka integration feature.

This section lists the functions that make up Vertica's Kafka integration feature.

1 - KafkaAvroParser

The KafkaAvroParser parses Avro-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.

The KafkaAvroParser parses Avro-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.

Syntax

KafkaAvroParser(param=value[,...])
enforce_length
When set to TRUE, rejects the row if any value is too wide to fit into its column. When using the default setting (FALSE) , the parser truncates any value that is too wide to fit within the column's maximum width.
reject_on_materialized_type_error
When set to TRUE, rejects the row if it contains a materialized column value that cannot be mapped into the materialized column's data type.
flatten_maps
If set to TRUE, flattens all Avro maps.
flatten_arrays
If set to TRUE, flattens Avro arrays.
flatten_records
If set to TRUE, flattens all Avro records.
external_schema
The schema of the Avro file as a JSON string. If this parameter is not specified, the parser assumes that each message has the schema on it. If you are using a schema registry, do not use this parameter.
codec
The codec in which the Avro file was written. Valid values are:
  • default: Data is not compressed and codec is not needed

  • deflate: Data is compressed using the deflate codec

  • snappy: Snappy compression

with_metadata
If set to TRUE, messages include Avro datum, schema, and object metadata. By default, the KafkaAvroParser parses messages without including schema and metadata. If you enable this parameter, write your messages using the Avro API and confirm they contain only Avro datum. The default value is FALSE.
schema_registry_url
Required, the URL of the Confluent schema registry. This parameter is required to load data based on a schema registry version. If you are using an external schema, do not use this parameter. For more information, refer to Avro Schema Registry.
schema_registry_ssl_ca_path
Required for TLS connections, the path on the Vertica node's file system to a directory containing one or more hashed certificate authority (CA) certificates that signed the schema registry's server certificate. Each Vertica node must store hashed CA certificates on the same path.

For details on hashed CA certificates, see Hashed CA Certificates.

schema_registry_ssl_cert_path
Path on the Vertica node's file system to a client certificate issued by a certificate authority (CA) that the schema registry trusts.
schema_registry_ssl_key_path
Path on the Vertica server file system to the private key for the client certificate defined with schema_registry_ssl_cert_path.
schema_registry_ssl_key_password_path
Path on the Vertica server file system to the optional password for the private key defined with schema_registry_ssl_key_path.
schema_registry_subject
In the schema registry, the subject of the schema to use for data loading.
schema_registry_version
In the schema registry, the version of the schema to use for data loading.
key_separator
Sets the character to use as the separator between keys.

Data types

KafkaAvroParser supports the same data types as the favroparser. For details, see Avro data.

Example

The following example demonstrates loading data from Kafka in an Avro format. The statement:

  • Loads data into an existing flex table named weather_logs.

  • Copies data from the default Kafka broker (running on the local system on port 9092).

  • The source is named temperature.

  • The source has a single partition.

  • The load starts from offset 0.

  • The load ends either after 10 seconds or the load reaches the end of the source, whichever occurs first.

  • The KafkaAvroParser does not flatten any arrays, maps, or records it finds in the source.

  • The schema for the data is provided in the statement as a JSON string. It defines a record type named Weather that contains fields for a station name, a time, and a temperature.

  • Rejected rows of data are saved to a table named t_rejects1.

=> COPY weather_logs
   SOURCE KafkaSource(stream='temperature|0|0', stop_on_eof=true,
                      duration=interval '10 seconds')
   PARSER KafkaAvroParser(flatten_arrays=False, flatten_maps=False, flatten_records=False,
                          external_schema=E'{"type":"record","name":"Weather","fields":'
                                           '[{"name":"station","type":"string"},'
                                           '{"name":"time","type":"long"},'
                                           '{"name":"temp","type":"int"}]}')
   REJECTED DATA AS TABLE "t_rejects1";

Hashed CA certificates

Some parameters like schema_registry_ssl_ca_path require hashed CA certificates rather than the CA certificates themselves. A hashed CA certificate is a symbolic link to the original CA certificate. This symbolic link must have the following naming scheme:

CA_hash.0

For example, if the hash for ca_cert.pem is 9741086f, the hashed CA certificate would be 9741086f.0, a symbolic link to ca_cert.pem.

For details, see the OpenSSL 1.1 or 1.0 documentation.

Hashing CA certificates

The procedure for hashing CA certificates varies between versions of openssl. You can find your version of openssl with:

$ openssl version

For openssl 1.1 or higher, use openssl rehash. For example, if the directory /my_ca_certs/ contains ca_cert.pem, you can hash and symbolically link to it with:

$ openssl rehash /my_ca_certs/

This adds the hashed CA certificate to the directory:

$ ls -l
total 8
lrwxrwxrwx 1 ver ver    8 Mar 13 14:41 9da13359.0 -> ca_cert.pem
-rw-r--r-- 1 ver ver 1245 Mar 13 14:41 ca_cert.pem

For openssl 1.0, you can use openssl x509 -hash -noout -in ca_cert.pem to retrieve the hash and then create a symbolic link to the CA certificate. For example:

  1. Run the following command to retrieve the hash of the CA certificate ca_cert.pem:

    $ openssl x509 -hash -noout -in /my_ca_certs/ca_cert.pem
    9741086f
    
  2. Create a symbolic link to /my_ca_certs/ca_cert.pem:

    $ ln /my_ca_certs/ca_cert.pem /my_ca_certs/9741086f.0
    

This adds the hashed CA certificate to the directory:

$ ls -l
total 8
-rw-r--r-- 2 ver ver 1220 Mar 13 13:41 9741086f.0 -> ca_cert.pem
-rw-r--r-- 2 ver ver 1220 Mar 13 13:41 ca_cert.pem

2 - KafkaCheckBrokers

Retrieves information about the brokers in a Kafka cluster.

Retrieves information about the brokers in a Kafka cluster. This function is intended mainly for internal use—it used by the streaming job scheduler to get the list of brokers in the Kafka cluster. You can call the function to determine which brokers Vertica knows about.

Syntax

KafkaCheckBrokers(USING PARAMETERS brokers='hostname:port[,hostname2:port...]'
                                   [, kafka_conf='kafka_configuration_setting']
                                   [, timeout=timeout_sec])
brokers
The host name and port number of a broker in the Kafka cluster used to retrieve the list of brokers. You can supply more than one broker as a comma-separated list. If the list includes brokers from more than one Kafka cluster, the cluster containing the last host in the list is queried.
kafka_conf

A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.

For details, see Directly setting Kafka library options.

kafka_conf_secret

Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as kafka_conf.

Values passed to this parameter are not logged or stored in system tables.

timeout

Integer number of seconds to wait for a response from the Kafka cluster.

Example

=> SELECT KafkaCheckBrokers(USING PARAMETERS brokers='kafka01.example.com:9092')
          OVER ();
 broker_id |        hostname     | port
-----------+---------------------+------
         2 | kafka03.example.com | 9092
         1 | kafka02.example.com | 9092
         3 | kafka04.example.com | 9092
         0 | kafka01.example.com | 9092
(4 rows)

3 - KafkaExport

Sends Vertica data to Kafka.

Sends Vertica data to Kafka.

If Vertica successfully exports all of the rows of data to Kafka, this function returns zero rows. You can use the output of this function to copy failed messages to a secondary table for evaluation and reprocessing.

Syntax

SELECT KafkaExport(partitionColumn, keyColumn, valueColumn
    USING PARAMETERS brokers='host[:port][,host...]',
    topic='topicname'
    [,kafka_conf='kafka_configuration_setting']
    [,kafka_topic_conf='kafka_configuration_setting']
    [,kafka_conf_secret='kafka_configuration_setting']
    [,fail_on_conf_parse_error=Boolean])
OVER (partition_clause) FROM table;

Parameters

partitionColumn
The target partition for the export. If you set this value to NULL, Vertica uses the default partitioning scheme. You can use the partition argument to send messages to partitions that map to Vertica segments.
keyColumn
The user defined key value associated with the valueColumn. Use NULL to skip this argument.
valueColumn
The message itself. The column is a LONG VARCHAR, allowing you to send up to 32MB of data to Kafka. However, Kafka may impose its own limits on message size.
brokers
A string containing a comma-separated list of one or more host names or IP addresses (with optional port number) of brokers in the Kafka cluster.
topic
The Kafka topic to which you are exporting.
kafka_conf

A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.

For details, see Directly setting Kafka library options.

kafka_topic_conf

A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets topic-level configuration properties that are not available through the Vertica integration with Kafka.

For details, see Directly setting Kafka library options.

kafka_conf_secret

Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as kafka_conf.

Values passed to this parameter are not logged or stored in system tables.

fail_on_conf_parse_error

Determines whether the function fails when kafka_conf contains incorrectly formatted options and values, or invalid configuration properties.

Default Value: FALSE

For accepted option and value formats, see Directly setting Kafka library options.

For a list of valid configuration properties, see the rdkafka GitHub repository.

Example

The following example converts each row from the iot_report table into a JSON object, and exports it to a Kafka topic named iot-report.

The iot_report table contains the following columns and data:

=> SELECT * FROM iot_report;
 server |        date         |        location         |    id
--------+---------------------+-------------------------+----------
      1 | 2016-10-11 04:09:28 |  -14.86058, 112.75848   | 70982027
      1 | 2017-07-02 12:37:48 |  -21.42197, -127.17672  | 49494918
      2 | 2017-01-20 03:05:24 |  37.17372, 136.14026    | 36670100
      2 | 2017-07-29 11:38:37 |  -38.99517, 171.72671   | 52049116
      1 | 2017-10-19 14:04:33 |  -71.72156, -36.27381   | 94328189
(5 rows)

To build the KafkaExport function, provide the following values to define the Kafka message:

  • partitionColumn: Use the server column. Because the server column values are one-based and Kafka partitions are zero-based, subtract 1 from the server value.

  • keyColumn: Use the id column. This requires that you explicitly cast the id value to a VARCHAR type.

  • valueColumn: Each message formats the date and location columns as the key/value pairs in the exported JSON object.

    To convert these rows to JSON format, use the ROW function to convert the date and location columns to structured data. Then, pass the ROW function to the TO_JSON function to encode the data structure as a JSON object.

Complete the remaining required function arguments and execute the KafkaExport function. If it succeeds, it returns 0 rows:

=> SELECT KafkaExport(server - 1, id::VARCHAR, TO_JSON(ROW(date, location))
                      USING PARAMETERS brokers='broker01:9092,broker02:9092',
                      topic='iot-results')
   OVER (PARTITION BEST)
   FROM iot_report;
 partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)

Use kafkacat to verify that the consumer contains the JSON-formatted data in the iot-results topic:

$ /opt/vertica/packages/kafka/bin/kafkacat -C -t iot-results -b broker01:9092,broker02:9092
{"date":"2017-01-20 03:05:24","location":" 37.17372, 136.14026   "}
{"date":"2017-07-29 11:38:37","location":" -38.99517, 171.72671  "}
{"date":"2016-10-11 04:09:28","location":" -14.86058, 112.75848  "}
{"date":"2017-10-19 14:04:33","location":" -71.72156, -36.27381  "}
{"date":"2017-07-02 12:37:48","location":" -21.42197, -127.17672 "}

See also

Producing data using KafkaExport

4 - KafkaJSONParser

The KafkaJSONParser parses JSON-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.

The KafkaJSONParser parses JSON-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.

Syntax

KafkaJSONParser(
        [enforce_length=Boolean]
        [, flatten_maps=Boolean]
        [, flatten_arrays=Boolean]
        [, start_point=string]
        [, start_point_occurrence=integer]
        [, omit_empty_keys=Boolean]
        [, reject_on_duplicate=Boolean]
        [, reject_on_materialized_type_error=Boolean]
        [, reject_on_empty_key=Boolean]
        [, key_separator=char]
        [, suppress_nonalphanumeric_key_chars=Boolean]
        )
enforce_length
If set to TRUE, rejects the row if data being loaded is too wide to fit into its column. Defaults to FALSE, which truncates any data that is too wide to fit into its column.
flatten_maps
If set to TRUE, flattens all JSON maps.
flatten_arrays
If set to TRUE, flattens JSON arrays.
start_point
Specifies the key in the JSON data that the parser should parse. The parser only extracts data that is within the value associated with the start_point key. It parses the values of all instances of the start_point key within the data.
start_point_occurrence
Integer value indicating which the occurrence of the key specified by the start_point parameter where the parser should begin parsing. For example, if you set this value to 4, the parser will only begin loading data from the fifth occurrence of the start_point key. Only has an effect if you also supply the start_point parameter.
omit_empty_keys
If set to TRUE, omits any key from the load data that does not have a value set.
reject_on_duplicate
If set to TRUE, rejects the row that contains duplicate key names. Key names are case-insensitive, so the keys "mykey" and "MyKey" are considered duplicates.
reject_on_materialized_type_error
If set to TRUE, rejects the row if the data includes keys matching an existing materialized column and has a key that cannot be mapped into the materialized column's data type.
reject_on_empty_key
If set to TRUE, rejects any row containing a key without a value.
key_separator
A single character to use as the separator between key values instead of the default period (.) character.
suppress_nonalphanumeric_key_chars
If set to TRUE, replaces all non-alphanumeric characters in JSON key values with an underscore (_) character.

See JSON data for more information.

The following example demonstrates loading JSON data from Kafka. The parameters in the statement define to the load to:

  • Load data into the pre-existing table named logs.

  • The KafkaSource streams the data from a single partition in the source called server_log.

  • The Kafka broker for the data load is running on the host named kafka01 on port 9092.

  • KafkaSource stops loading data after either 10 seconds or on reaching the end of the stream, whichever happens first.

  • The KafkJSONParser flattens any arrays or maps in the JSON data.

=> COPY logs SOURCE KafkaSource(stream='server_log|0|0',
                                stop_on_eof=true,
                                duration=interval '10 seconds',
                                brokers='kafka01:9092')
   PARSER KafkaJSONParser(flatten_arrays=True, flatten_maps=True);

5 - KafkaListManyTopics

Retrieves information about all topics from a Kafka broker.

Retrieves information about all topics from a Kafka broker. This function lists all of the topics defined in the Kafka cluster as well the number of partitions it contains and which brokers serve the topic.

Syntax

KafkaListManyTopics('broker:port[;...]'
                    [USING PARAMETERS
                        [kafka_conf='kafka_configuration_setting'
                        [, timeout=timeout_sec]])
broker
The hostname (or ip address) of a broker in the Kafka cluster
port
The port number on which the broker is running.
kafka_conf

A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.

For details, see Directly setting Kafka library options.

kafka_conf_secret

Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as kafka_conf.

Values passed to this parameter are not logged or stored in system tables.

timeout

Integer number of seconds to wait for a response from the Kafka cluster.

Example

=> \x
Expanded display is on.
=> SELECT KafkaListManyTopics('kafka01.example.com:9092')
   OVER (PARTITION AUTO);
-[ RECORD 1 ]--+--------------------------------------------------
brokers        | kafka01.example.com:9092,kafka02.example.com:9092
topic          | __consumer_offsets
num_partitions | 50
-[ RECORD 2 ]--+--------------------------------------------------
brokers        | kafka01.example.com:9092,kafka02.example.com:9092
topic          | iot_data
num_partitions | 1
-[ RECORD 3 ]--+--------------------------------------------------
brokers        | kafka01.example.com:9092,kafka02.example.com:9092
topic          | test
num_partitions | 1

6 - KafkaListTopics

Gets the list of topics available from a Kafka broker.

Gets the list of topics available from a Kafka broker.

Syntax

KafkaListTopics(USING PARAMETERS brokers='hostname:port[,hostname2:port2...]'
                                 [, kafka_conf='kafka_configuration_setting']
                                 [, timeout=timeout_sec])
brokers
The host name and port number of the broker to query for a topic list. You can supply more than one broker as a comma-separated list. However, the returned list will only contains the topics served by the last broker in the list.
kafka_conf

A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.

For details, see Directly setting Kafka library options.

kafka_conf_secret

Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as kafka_conf.

Values passed to this parameter are not logged or stored in system tables.

timeout

Integer number of seconds to wait for a response from the Kafka cluster.

Example

=> SELECT KafkaListTopics(USING PARAMETERS brokers='kafka1-01.example.com:9092')
          OVER ();
         topic         | num_partitions
-----------------------+----------------
 test                  |              1
 iot_data              |              1
 __consumer_offsets    |             50
 vertica_notifications |              1
 web_hits              |              1
(5 rows)

7 - KafkaOffsets

The KafkaOffsets user-defined transform function returns load operation statistics generated by the most recent invocation of KafkaSource.

The KafkaOffsets user-defined transform function returns load operation statistics generated by the most recent invocation of KafkaSource. Query KafkaOffsets to see the metadata produced by your most recent load operation. You can query KafkaOffsets after each KafkaSource invocation to view information about that load. If you are using the scheduler, you can also view historical load information in the stream_microbatch_history table.

For each load operation, KafkaOffsets returns the following:

  • source kafka topic

  • source kafka partition

  • starting offset

  • ending offset

  • number of messages loaded

  • number of bytes read

  • duration of the load operation

  • end message

  • end reason

The following example demonstrates calling KafkaOffsets to show partition information that was loaded using KafkaSource:

=> SELECT kpartition, start_offset, end_offset, msg_count, ending FROM (select KafkaOffsets() over()) AS stats ORDER BY kpartition;
 kpartition | start_offset | end_offset | msg_count |   ending
------------+--------------+------------+-----------+------------
          0 |           -2 |       9999 |      1068 | END_OFFSET

The output shows that KafkaSource loaded 1068 messages (rows) from Kafka in a single partition. The KafkaSource ended the data load because it reached the ending offset.

8 - KafkaParser

The KafkaParser does not parse data loaded from Kafka.

The KafkaParser does not parse data loaded from Kafka. Instead, it passes the messages through as LONG VARCHAR values. Use this parser when you want to load raw Kafka messages into Vertica for further processing. You can use this parser as a catch-all for unsupported formats.

KafkaParser does not take any parameters.

Example

The following example loads raw messages from a Kafka topic named iot-data into a table named raw_iot.

=> CREATE TABLE raw_iot(message LONG VARCHAR);
CREATE TABLE
=> COPY raw_iot SOURCE KafkaSource(stream='iot-data|0|-2,iot-data|1|-2,iot-data|2|-2',
                                   brokers='docd01:6667,docd03:6667', stop_on_eof=TRUE)
                PARSER KafkaParser();
 Rows Loaded
-------------
        5000
(1 row)

=> select * from raw_iot limit 10;
              message
------------------------------------
 10039, 59, -68.951406, -19.270126
 10042, 40, -82.688712, 4.7187705
 10054, 6, -153.805268, -10.5173935
 10054, 71, -135.613150, 58.286458
 10081, 44, 130.288419, -77.344405
 10104, -5, 77.882598, -56.600744
 10132, 87, 103.530616, -69.672863
 10135, 6, -121.420382, 15.3229855
 10166, 77, -179.592211, 42.0477075
 10183, 62, 17.225394, -55.6644765
(10 rows)

9 - KafkaSource

The KafkaSource UDL accesses data from a Kafka cluster.

The KafkaSource UDL accesses data from a Kafka cluster. All Kafka parsers must use KafkaSource. Messages processed by KafkaSource must be at least one byte in length. KafkaSource writes an error message to vertica.log for zero-length messages.

The output of KafkaSource does not work directly with any of the non-Kafka parsers in Vertica (such as the FCSVPARSER). The KafkaParser produces additional metadata about the stream that parsers need to use in order to correctly parse the data. You must use filters such as KafkaInsertDelimiters to transform the data into a format that can be processed by other parsers. See Parsing custom formats for more an example.

You can cancel a running KafkaSource data load by using a close session function such as CLOSE_ALL_SESSIONS.

Syntax

KafkaSource(stream='topic_name|partition|start_offset[|end_offset]'[, param=value [,...] ] )
stream
Required. Defines the data to be loaded as a comma-separated list of one or more partitions. Each partition is defined by three required values and one optional value separated by pipe characters (|) :
  • topic_name: the name of the Kafka topic to load data from. You can read from different Kafka topics in the same stream parameter, with some limitations. See Loading from Multiple Topics in the Same Stream Parameter below for more information.

  • partition: the partition in the Kafka topic to copy.

  • start_offset: the offset in the Kafka topic where the load will begin. This offset is inclusive (the message with the offset start_offset is loaded). See Special Starting Offset Values below for additional options.

  • end_offset: the optional offset where the load should end. This offset is exclusive (the message with the offset end_offset will not be loaded).
    To end a load using end_offset, you must supply an ending offset value for all partitions in the stream parameter. Attempting to set an ending offset for some partitions and not set offset values for others results in an error.
    If you do not specify an ending offset, you must supply at least one other ending condition using stop_on_eof or duration.

brokers
A comma-separated list of host:port pairs of the brokers in the Kafka cluster. Vertica recommends running Kafka on a different machine than Vertica.

Default: localhost:9092

duration
An INTERVAL that specifies the duration of the frame. After this specified amount of time, KafkaSource terminates the COPY statements. If this parameter is not set, you must set at least one other ending condition by using stop_on_eof or specify an ending offset instead. See Duration Note below for more information.
executionparallelism
The number of threads to use when loading data. Normally, you set this to an integer value between 1 and the number of partitions the node is loading from. Setting this parameter to a reduced value limits the number of threads used to process any COPY statement. It also increases the throughput of short queries issued in the pool, especially if the queries are executed concurrently.

If you do not specify this parameter, Vertica automatically creates a thread for every partition, up to the limit allowed by the resource pool.

If the value you specify for the KafkaSource is lower than the value specified for the scheduler resource pool, the KafkaSource value applies. This value cannot exceed the value specified for the scheduler's resource pool.

stop_on_eof
Determines whether KafkaSource should terminate the COPY statement after it reaches the end of a file. If this value is not set, you must set at least one other ending condition using duration or by supplying ending offsets instead.

Default: FALSE

group_id

The name of the Kafka consumer group to which Vertica reports its progress consuming messages. Set this value to disable progress reports to a Kafka consumer group. For details, see Monitoring Vertica message consumption with consumer groups.

Default: vertica_database-name

kafka_conf

A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.

For details, see Directly setting Kafka library options.

kafka_topic_conf

A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets topic-level configuration properties that are not available through the Vertica integration with Kafka.

For details, see Directly setting Kafka library options.

kafka_conf_secret

Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as kafka_conf.

Values passed to this parameter are not logged or stored in system tables.

fail_on_conf_parse_error

Determines whether the function fails when kafka_conf contains incorrectly formatted options and values, or invalid configuration properties.

Default Value: FALSE

For accepted option and value formats, see Directly setting Kafka library options.

For a list of valid configuration properties, see the rdkafka GitHub repository.

Special starting offset values

The start_offset portion of the stream parameter lets you start loading messages from a specific point in the topic's partition. It also accepts one of two special offset values:

  • -2 tells KafkaSource to start loading at the earliest available message in the topic's partition. This value is useful when you want to load as many messages as you can from the Kafka topic's partition.

  • -3 tells KafkaSource to start loading from the consumer group's saved offset. If the consumer group does not have a saved offset, it starts loading from the earliest available message in the topic partition. See Monitoring Vertica message consumption with consumer groups for more information.

Loading from multiple topics in the same stream parameter

You can load from multiple Kafka topics in a single stream parameter as long as you follow these guidelines:

  • The data for the topics must be in the same format because you pass the data from KafkaSource to a single parser. For example, you cannot load data from one topic that is in Avro format and another in JSON format.

  • Similarly, you need to be careful if you are loading Avro data and specifying an external schema from a registry. The Avro parser accepts a single schema per data load. If the data from the separate topics have different schemas, then all of the data from one of the topics will be rejected.

  • The data in the different topics should have the same (or very similar) schemas, especially if you are loading data into a traditional Vertica table. While you can load data with different schemas into a flex table, there are only a few scenarios where it makes sense to combine dissimilar data into a single table.

Duration note

The duration parameter applies to the length of time that Vertica allows the KafkaSource function to run. It usually reflects the amount of time the overall load statement takes. However, if KafkaSource is loading a large volume of data or the data needs extensive processing and parsing, the overall runtime of the query can exceed the amount of time specified in duration.

Example

The following example demonstrates calling KafkaSource to load data from Kafka into an existing flex table named web_table with the following options:

  • The stream is named web_hits which has a single partition.

  • The load starts at the earliest message in the stream (identified by passing -2 as the start offset).

  • The load ends when it reaches the message with offset 1000.

  • The Kafka cluster's brokers are kafka01 and kafka03 in the example.com domain.

  • The brokers are listening on port 9092.

  • The load ends if it reaches the end of the stream before reaching the message with offset 1000. If you do not supply this option, the connector waits until Kafka sends a message with offset 1000.

  • The loaded data is sent to the KafkaJSONParser for processing.

=> COPY web_table
   SOURCE KafkaSource(stream='web_hits|0|-2|1000',
                      brokers='kafka01.example.com:9092,kafka03.example.com:9092',
                      stop_on_eof=true)
   PARSER KafkaJSONParser();
 Rows Loaded
-------------
        1000
(1 row)

To view details about this load operation, query KafkaOffsets. KafkaOffsets returns metadata about the messages that Vertica consumed from Kafka during the most recent KafkaSource invocation:

=> SELECT KafkaOffsets() OVER();
  ktopic  | kpartition | start_offset | end_offset | msg_count | bytes_read |    duration     |   ending   |      end_msg
----------+------------+--------------+------------+-----------+------------+-----------------+------------+-------------------
 web_hits |          0 |            0 |        999 |      1000 |     197027 | 00:00:00.385365 | END_OFFSET | Last message read
(1 row)

The msg_count column verifies that Vertica loaded 1000 messages, and the ending column indicates that Vertica stopped consuming messages when it reached the message with the offset 1000.

10 - KafkaTopicDetails

Retrieves information about the specified topic from one or more Kafka brokers.

Retrieves information about the specified topic from one or more Kafka brokers. This function lists details about the topic partitions, and the Kafka brokers that serve each partition in the Kafka cluster.

Syntax

KafkaTopicDetails(USING PARAMETERS brokers='hostname:port[,hostname2:port2...]'
                                  , topic=topic_name
                                 [, kafka_conf='option=value[;option2=value2...]']
                                 [, timeout=timeout_sec])
brokers
The hostname (or IP address) of a broker in the Kafka cluster.
port
The port number on which the broker is running.
topic
Kafka topic that you want details for.
kafka_conf
A semicolon-delimited list of option=value pairs to pass directly to the rdkafka library. This is the library Vertica uses to communicate with Kafka. You can use this parameter to directly set configuration options that are not available through the Vertica integration with Kafka. See Directly setting Kafka library options for details.
kafka_conf_secret

Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as kafka_conf.

Values passed to this parameter are not logged or stored in system tables.

timeout
Integer number of seconds to wait for a response from the Kafka cluster.

Example

=> SELECT KafkaTopicDetails(USING PARAMETERS brokers='kafka1-01.example.com:9092',topic='iot_data') OVER();
 partition_id | lead_broker | replica_brokers | in_sync_replica_brokers
--------------+-------------+-----------------+-------------------------
            0 |           0 | 0               | 0
            1 |           1 | 1               | 1
            2 |           0 | 0               | 0
            3 |           1 | 1               | 1
            4 |           0 | 0               | 0
(5 rows)