KafkaSource
The KafkaSource UDL accesses data from a Kafka cluster. All Kafka parsers must use KafkaSource. Messages processed by KafkaSource must be at least one byte in length. KafkaSource writes an error message to vertica.log for zero-length messages.
The output of KafkaSource does not work directly with any of the non-Kafka parsers in Vertica (such as the FCSVPARSER). The KafkaParser produces additional metadata about the stream that parsers need to use in order to correctly parse the data. You must use filters such as KafkaInsertDelimiters to transform the data into a format that can be processed by other parsers. See Parsing custom formats for more an example.
You can cancel a running KafkaSource data load by using a close session function such as CLOSE_ALL_SESSIONS.
Syntax
KafkaSource(stream='topic_name|partition|start_offset[|end_offset]'[, param=value [,...] ] )
stream
- Required. Defines the data to be loaded as a comma-separated list of one or more partitions. Each partition is defined by three required values and one optional value separated by pipe characters (|) :
-
topic_name
: the name of the Kafka topic to load data from. You can read from different Kafka topics in the same stream parameter, with some limitations. See Loading from Multiple Topics in the Same Stream Parameter below for more information. -
partition
: the partition in the Kafka topic to copy. -
start_offset
: the offset in the Kafka topic where the load will begin. This offset is inclusive (the message with the offsetstart_offset
is loaded). See Special Starting Offset Values below for additional options. -
end_offset
: the optional offset where the load should end. This offset is exclusive (the message with the offsetend_offset
will not be loaded).
To end a load usingend_offset
, you must supply an ending offset value for all partitions in the stream parameter. Attempting to set an ending offset for some partitions and not set offset values for others results in an error.
If you do not specify an ending offset, you must supply at least one other ending condition usingstop_on_eof
orduration
.
-
brokers
- A comma-separated list of host:port pairs of the brokers in the Kafka cluster. Vertica recommends running Kafka on a different machine than Vertica.
Default: localhost:9092
duration
- An INTERVAL that specifies the duration of the frame. After this specified amount of time, KafkaSource terminates the COPY statements. If this parameter is not set, you must set at least one other ending condition by using stop_on_eof or specify an ending offset instead. See Duration Note below for more information.
executionparallelism
- The number of threads to use when loading data. Normally, you set this to an integer value between 1 and the number of partitions the node is loading from. Setting this parameter to a reduced value limits the number of threads used to process any COPY statement. It also increases the throughput of short queries issued in the pool, especially if the queries are executed concurrently.
If you do not specify this parameter, Vertica automatically creates a thread for every partition, up to the limit allowed by the resource pool.
If the value you specify for the KafkaSource is lower than the value specified for the scheduler resource pool, the KafkaSource value applies. This value cannot exceed the value specified for the scheduler's resource pool.
stop_on_eof
- Determines whether KafkaSource should terminate the COPY statement after it reaches the end of a file. If this value is not set, you must set at least one other ending condition using
duration
or by supplying ending offsets instead.Default: FALSE
group_id
The name of the Kafka consumer group to which Vertica reports its progress consuming messages. Set this value to disable progress reports to a Kafka consumer group. For details, see Monitoring Vertica message consumption with consumer groups.
Default:
vertica_
database-name
kafka_conf
A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.
For details, see Directly setting Kafka library options.
kafka_topic_conf
A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets topic-level configuration properties that are not available through the Vertica integration with Kafka.
For details, see Directly setting Kafka library options.
kafka_conf_secret
Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as
kafka_conf
.Values passed to this parameter are not logged or stored in system tables.
fail_on_conf_parse_error
Determines whether the function fails when
kafka_conf
contains incorrectly formatted options and values, or invalid configuration properties.Default Value: FALSE
For accepted option and value formats, see Directly setting Kafka library options.
For a list of valid configuration properties, see the rdkafka GitHub repository.
Special starting offset values
The start_offset
portion of the stream
parameter lets you start loading messages from a specific point in the topic's partition. It also accepts one of two special offset values:
-
-2 tells KafkaSource to start loading at the earliest available message in the topic's partition. This value is useful when you want to load as many messages as you can from the Kafka topic's partition.
-
-3 tells KafkaSource to start loading from the consumer group's saved offset. If the consumer group does not have a saved offset, it starts loading from the earliest available message in the topic partition. See Monitoring Vertica message consumption with consumer groups for more information.
Loading from multiple topics in the same stream parameter
You can load from multiple Kafka topics in a single stream parameter as long as you follow these guidelines:
-
The data for the topics must be in the same format because you pass the data from KafkaSource to a single parser. For example, you cannot load data from one topic that is in Avro format and another in JSON format.
-
Similarly, you need to be careful if you are loading Avro data and specifying an external schema from a registry. The Avro parser accepts a single schema per data load. If the data from the separate topics have different schemas, then all of the data from one of the topics will be rejected.
-
The data in the different topics should have the same (or very similar) schemas, especially if you are loading data into a traditional Vertica table. While you can load data with different schemas into a flex table, there are only a few scenarios where it makes sense to combine dissimilar data into a single table.
Duration note
The duration
parameter applies to the length of time that Vertica allows the KafkaSource function to run. It usually reflects the amount of time the overall load statement takes. However, if KafkaSource is loading a large volume of data or the data needs extensive processing and parsing, the overall runtime of the query can exceed the amount of time specified in duration
.
Example
The following example demonstrates calling KafkaSource to load data from Kafka into an existing flex table named web_table with the following options:
-
The stream is named web_hits which has a single partition.
-
The load starts at the earliest message in the stream (identified by passing -2 as the start offset).
-
The load ends when it reaches the message with offset 1000.
-
The Kafka cluster's brokers are kafka01 and kafka03 in the example.com domain.
-
The brokers are listening on port 9092.
-
The load ends if it reaches the end of the stream before reaching the message with offset 1000. If you do not supply this option, the connector waits until Kafka sends a message with offset 1000.
-
The loaded data is sent to the KafkaJSONParser for processing.
=> COPY web_table
SOURCE KafkaSource(stream='web_hits|0|-2|1000',
brokers='kafka01.example.com:9092,kafka03.example.com:9092',
stop_on_eof=true)
PARSER KafkaJSONParser();
Rows Loaded
-------------
1000
(1 row)
To view details about this load operation, query KafkaOffsets. KafkaOffsets returns metadata about the messages that Vertica consumed from Kafka during the most recent KafkaSource invocation:
=> SELECT KafkaOffsets() OVER();
ktopic | kpartition | start_offset | end_offset | msg_count | bytes_read | duration | ending | end_msg
----------+------------+--------------+------------+-----------+------------+-----------------+------------+-------------------
web_hits | 0 | 0 | 999 | 1000 | 197027 | 00:00:00.385365 | END_OFFSET | Last message read
(1 row)
The msg_count
column verifies that Vertica loaded 1000 messages, and the ending
column indicates that Vertica stopped consuming messages when it reached the message with the offset 1000.