KafkaSource

The KafkaSource UDL accesses data from a Kafka cluster.

The KafkaSource UDL accesses data from a Kafka cluster. All Kafka parsers must use KafkaSource. Messages processed by KafkaSource must be at least one byte in length. KafkaSource writes an error message to vertica.log for zero-length messages.

The output of KafkaSource does not work directly with any of the non-Kafka parsers in Vertica (such as the FCSVPARSER). The KafkaParser produces additional metadata about the stream that parsers need to use in order to correctly parse the data. You must use filters such as KafkaInsertDelimiters to transform the data into a format that can be processed by other parsers. See Parsing custom formats for more an example.

You can cancel a running KafkaSource data load by using a close session function such as CLOSE_ALL_SESSIONS.

Syntax

KafkaSource(stream='topic_name|partition|start_offset[|end_offset]'[, param=value [,...] ] )
stream
Required. Defines the data to be loaded as a comma-separated list of one or more partitions. Each partition is defined by three required values and one optional value separated by pipe characters (|) :
  • topic_name: the name of the Kafka topic to load data from. You can read from different Kafka topics in the same stream parameter, with some limitations. See Loading from Multiple Topics in the Same Stream Parameter below for more information.

  • partition: the partition in the Kafka topic to copy.

  • start_offset: the offset in the Kafka topic where the load will begin. This offset is inclusive (the message with the offset start_offset is loaded). See Special Starting Offset Values below for additional options.

  • end_offset: the optional offset where the load should end. This offset is exclusive (the message with the offset end_offset will not be loaded).
    To end a load using end_offset, you must supply an ending offset value for all partitions in the stream parameter. Attempting to set an ending offset for some partitions and not set offset values for others results in an error.
    If you do not specify an ending offset, you must supply at least one other ending condition using stop_on_eof or duration.

brokers
A comma-separated list of host:port pairs of the brokers in the Kafka cluster. Vertica recommends running Kafka on a different machine than Vertica.

Default: localhost:9092

duration
An INTERVAL that specifies the duration of the frame. After this specified amount of time, KafkaSource terminates the COPY statements. If this parameter is not set, you must set at least one other ending condition by using stop_on_eof or specify an ending offset instead. See Duration Note below for more information.
executionparallelism
The number of threads to use when loading data. Normally, you set this to an integer value between 1 and the number of partitions the node is loading from. Setting this parameter to a reduced value limits the number of threads used to process any COPY statement. It also increases the throughput of short queries issued in the pool, especially if the queries are executed concurrently.

If you do not specify this parameter, Vertica automatically creates a thread for every partition, up to the limit allowed by the resource pool.

If the value you specify for the KafkaSource is lower than the value specified for the scheduler resource pool, the KafkaSource value applies. This value cannot exceed the value specified for the scheduler's resource pool.

stop_on_eof
Determines whether KafkaSource should terminate the COPY statement after it reaches the end of a file. If this value is not set, you must set at least one other ending condition using duration or by supplying ending offsets instead.

Default: FALSE

group_id

The name of the Kafka consumer group to which Vertica reports its progress consuming messages. Set this value to disable progress reports to a Kafka consumer group. For details, see Monitoring Vertica message consumption with consumer groups.

Default: vertica_database-name

kafka_conf

A JSON-formatted object of option/value pairs to pass directly to the rdkafka library. This is the library Vertica uses to communicate with Kafka. You can use this parameter to directly set configuration options that are not available through the Vertica integration with Kafka. See Directly setting Kafka library options for details.

fail_on_conf_parse_error

Determines whether the function fails when kafka_conf contains incorrectly formatted options and values, or invalid configuration properties.

Default Value: FALSE

For accepted option and value formats, see Directly setting Kafka library options.

For a list of valid configuration properties, see the rdkafka GitHub repository.

Special starting offset values

The start_offset portion of the stream parameter lets you start loading messages from a specific point in the topic's partition. It also accepts one of two special offset values:

  • -2 tells Set Snippet Variable Value in Topic to start loading at the earliest available message in the topic's partition. This value is useful when you want to load as many messages as you can from the Kafka topic's partition.

  • -3 tells Set Snippet Variable Value in Topic to start loading from the consumer group's saved offset. If the consumer group does not have a saved offset, it starts loading from the earliest available message in the topic partition. See Monitoring Vertica message consumption with consumer groups for more information.

Loading from multiple topics in the same stream parameter

You can load from multiple Kafka topics in a single stream parameter as long as you follow these guidelines:

  • The data for the topics must be in the same format because you pass the data from KafkaSource to a single parser. For example, you cannot load data from one topic that is in Avro format and another in JSON format.

  • Similarly, you need to be careful if you are loading Avro data and specifying an external schema from a registry. The Avro parser accepts a single schema per data load. If the data from the separate topics have different schemas, then all of the data from one of the topics will be rejected.

  • The data in the different topics should have the same (or very similar) schemas, especially if you are loading data into a traditional Vertica table. While you can load data with different schemas into a flex table, there are only a few scenarios where it makes sense to combine dissimilar data into a single table.

Duration note

The duration parameter applies to the length of time that Vertica allows the KafkaSource function to run. It usually reflects the amount of time the overall load statement takes. However, if KafkaSource is loading a large volume of data or the data needs extensive processing and parsing, the overall runtime of the query can exceed the amount of time specified in duration.

Example

The following example demonstrates calling KafkaSource to load data from Kafka into an existing flex table named web_table with the following options:

  • The stream is named web_hits which has a single partition.

  • The load starts at the earliest message in the stream (identified by passing -2 as the start offset).

  • The load ends when it reaches the message with offset 1000.

  • The Kafka cluster's brokers are kafka01 and kafka03 in the example.com domain.

  • The brokers are listening on port 9092.

  • The load ends if it reaches the end of the stream before reaching the message with offset 1000. If you do not supply this option, the connector waits until Kafka sends a message with offset 1000.

  • The loaded data is sent to the KafkaJSONParser for processing.

=> COPY web_table
   SOURCE KafkaSource(stream='web_hits|0|-2|1000',
                      brokers='kafka01.example.com:9092,kafka03.example.com:9092',
                      stop_on_eof=true)
   PARSER KafkaJSONParser();
 Rows Loaded
-------------
        1000
(1 row)

To view details about this load operation, query KafkaOffsets. KafkaOffsets returns metadata about the messages that Vertica consumed from Kafka during the most recent KafkaSource invocation:

=> SELECT KafkaOffsets() OVER();
  ktopic  | kpartition | start_offset | end_offset | msg_count | bytes_read |    duration     |   ending   |      end_msg
----------+------------+--------------+------------+-----------+------------+-----------------+------------+-------------------
 web_hits |          0 |            0 |        999 |      1000 |     197027 | 00:00:00.385365 | END_OFFSET | Last message read
(1 row)

The msg_count column verifies that Vertica loaded 1000 messages, and the ending column indicates that Vertica stopped consuming messages when it reached the message with the offset 1000.