This section lists the functions that make up Vertica's Kafka integration feature.
This is the multi-page printable view of this section. Click here to print.
Kafka function reference
- 1: KafkaAvroParser
- 2: KafkaCheckBrokers
- 3: KafkaExport
- 4: KafkaJSONParser
- 5: KafkaListManyTopics
- 6: KafkaListTopics
- 7: KafkaOffsets
- 8: KafkaParser
- 9: KafkaSource
- 10: KafkaTopicDetails
1 - KafkaAvroParser
The KafkaAvroParser parses Avro-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.
Syntax
KafkaAvroParser(param=value[,...])
enforce_length
- When set to TRUE, rejects the row if any value is too wide to fit into its column. When using the default setting (FALSE) , the parser truncates any value that is too wide to fit within the column's maximum width.
reject_on_materialized_type_error
- When set to TRUE, rejects the row if it contains a materialized column value that cannot be mapped into the materialized column's data type.
flatten_maps
- If set to TRUE, flattens all Avro maps.
flatten_arrays
- If set to TRUE, flattens Avro arrays.
flatten_records
- If set to TRUE, flattens all Avro records.
external_schema
- The schema of the Avro file as a JSON string. If this parameter is not specified, the parser assumes that each message has the schema on it. If you are using a schema registry, do not use this parameter.
codec
- The codec in which the Avro file was written. Valid values are:
-
default
: Data is not compressed and codec is not needed -
deflate
: Data is compressed using the deflate codec -
snappy
: Snappy compression
Note
This option is mainly provided for backwards compatibility. You usually have Kafka compress data at the message level, and have KafkaSource decompress the message for you. -
with_metadata
- If set to TRUE, messages include Avro datum, schema, and object metadata. By default, the KafkaAvroParser parses messages without including schema and metadata. If you enable this parameter, write your messages using the Avro API and confirm they contain only Avro datum. The default value is FALSE.
schema_registry_url
- Required, the URL of the Confluent schema registry. This parameter is required to load data based on a schema registry version. If you are using an external schema, do not use this parameter. For more information, refer to Avro Schema Registry.
Note
TLS connections must use the HTTPS protocol. schema_registry_ssl_ca_path
- Required for TLS connections, the path on the Vertica node's file system to a directory containing one or more hashed certificate authority (CA) certificates that signed the schema registry's server certificate. Each Vertica node must store hashed CA certificates on the same path.
-
Important
In some circumstances, you might receive a validation error stating that the KafkaAvroParser cannot locate your CA certificate. To correct this error, store your CA certificate in your operating system's default bundle. For example, store the CA certificate in/etc/pki/tls/certs/ca-bundle.crt
on Red Hat operating systems.For details on hashed CA certificates, see Hashed CA Certificates.
schema_registry_ssl_cert_path
- Path on the Vertica node's file system to a client certificate issued by a certificate authority (CA) that the schema registry trusts.
schema_registry_ssl_key_path
- Path on the Vertica server file system to the private key for the client certificate defined with
schema_registry_ssl_cert_path
. schema_registry_ssl_key_password_path
- Path on the Vertica server file system to the optional password for the private key defined with
schema_registry_ssl_key_path
. schema_registry_subject
- In the schema registry, the subject of the schema to use for data loading.
schema_registry_version
- In the schema registry, the version of the schema to use for data loading.
key_separator
- Sets the character to use as the separator between keys.
Data types
KafkaAvroParser supports the same data types as the favroparser. For details, see Avro data.
Example
The following example demonstrates loading data from Kafka in an Avro format. The statement:
-
Loads data into an existing flex table named weather_logs.
-
Copies data from the default Kafka broker (running on the local system on port 9092).
-
The source is named temperature.
-
The source has a single partition.
-
The load starts from offset 0.
-
The load ends either after 10 seconds or the load reaches the end of the source, whichever occurs first.
-
The KafkaAvroParser does not flatten any arrays, maps, or records it finds in the source.
-
The schema for the data is provided in the statement as a JSON string. It defines a record type named Weather that contains fields for a station name, a time, and a temperature.
-
Rejected rows of data are saved to a table named t_rejects1.
=> COPY weather_logs
SOURCE KafkaSource(stream='temperature|0|0', stop_on_eof=true,
duration=interval '10 seconds')
PARSER KafkaAvroParser(flatten_arrays=False, flatten_maps=False, flatten_records=False,
external_schema=E'{"type":"record","name":"Weather","fields":'
'[{"name":"station","type":"string"},'
'{"name":"time","type":"long"},'
'{"name":"temp","type":"int"}]}')
REJECTED DATA AS TABLE "t_rejects1";
Hashed CA certificates
Some parameters like schema_registry_ssl_ca_path
require hashed CA certificates rather than the CA certificates themselves. A hashed CA certificate is a symbolic link to the original CA certificate. This symbolic link must have the following naming scheme:
CA_hash.0
For example, if the hash for ca_cert.pem
is 9741086f
, the hashed CA certificate would be 9741086f.0
, a symbolic link to ca_cert.pem
.
For details, see the OpenSSL 1.1 or 1.0 documentation.
Hashing CA certificates
The procedure for hashing CA certificates varies between versions of openssl
. You can find your version of openssl
with:
$ openssl version
For openssl
1.1 or higher, use openssl rehash
. For example, if the directory /my_ca_certs/
contains ca_cert.pem
, you can hash and symbolically link to it with:
$ openssl rehash /my_ca_certs/
This adds the hashed CA certificate to the directory:
$ ls -l
total 8
lrwxrwxrwx 1 ver ver 8 Mar 13 14:41 9da13359.0 -> ca_cert.pem
-rw-r--r-- 1 ver ver 1245 Mar 13 14:41 ca_cert.pem
For openssl
1.0, you can use openssl x509 -hash -noout -in
ca_cert
.pem
to retrieve the hash and then create a symbolic link to the CA certificate. For example:
-
Run the following command to retrieve the hash of the CA certificate
ca_cert.pem
:$ openssl x509 -hash -noout -in /my_ca_certs/ca_cert.pem 9741086f
-
Create a symbolic link to
/my_ca_certs/ca_cert.pem
:$ ln /my_ca_certs/ca_cert.pem /my_ca_certs/9741086f.0
This adds the hashed CA certificate to the directory:
$ ls -l
total 8
-rw-r--r-- 2 ver ver 1220 Mar 13 13:41 9741086f.0 -> ca_cert.pem
-rw-r--r-- 2 ver ver 1220 Mar 13 13:41 ca_cert.pem
2 - KafkaCheckBrokers
Retrieves information about the brokers in a Kafka cluster. This function is intended mainly for internal use—it used by the streaming job scheduler to get the list of brokers in the Kafka cluster. You can call the function to determine which brokers Vertica knows about.
Syntax
KafkaCheckBrokers(USING PARAMETERS brokers='hostname:port[,hostname2:port...]'
[, kafka_conf='kafka_configuration_setting']
[, timeout=timeout_sec])
brokers
- The host name and port number of a broker in the Kafka cluster used to retrieve the list of brokers. You can supply more than one broker as a comma-separated list. If the list includes brokers from more than one Kafka cluster, the cluster containing the last host in the list is queried.
kafka_conf
A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.
For details, see Directly setting Kafka library options.
kafka_conf_secret
Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as
kafka_conf
.Values passed to this parameter are not logged or stored in system tables.
timeout
Integer number of seconds to wait for a response from the Kafka cluster.
Example
=> SELECT KafkaCheckBrokers(USING PARAMETERS brokers='kafka01.example.com:9092')
OVER ();
broker_id | hostname | port
-----------+---------------------+------
2 | kafka03.example.com | 9092
1 | kafka02.example.com | 9092
3 | kafka04.example.com | 9092
0 | kafka01.example.com | 9092
(4 rows)
3 - KafkaExport
Sends Vertica data to Kafka.
If Vertica successfully exports all of the rows of data to Kafka, this function returns zero rows. You can use the output of this function to copy failed messages to a secondary table for evaluation and reprocessing.
Syntax
SELECT KafkaExport(partitionColumn, keyColumn, valueColumn
USING PARAMETERS brokers='host[:port][,host...]',
topic='topicname'
[,kafka_conf='kafka_configuration_setting']
[,kafka_topic_conf='kafka_configuration_setting']
[,kafka_conf_secret='kafka_configuration_setting']
[,fail_on_conf_parse_error=Boolean])
OVER (partition_clause) FROM table;
Parameters
partitionColumn
- The target partition for the export. If you set this value to NULL, Vertica uses the default partitioning scheme. You can use the partition argument to send messages to partitions that map to Vertica segments.
keyColumn
- The user defined key value associated with the valueColumn. Use NULL to skip this argument.
valueColumn
- The message itself. The column is a LONG VARCHAR, allowing you to send up to 32MB of data to Kafka. However, Kafka may impose its own limits on message size.
brokers
- A string containing a comma-separated list of one or more host names or IP addresses (with optional port number) of brokers in the Kafka cluster.
topic
- The Kafka topic to which you are exporting.
kafka_conf
A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.
For details, see Directly setting Kafka library options.
kafka_topic_conf
A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets topic-level configuration properties that are not available through the Vertica integration with Kafka.
For details, see Directly setting Kafka library options.
kafka_conf_secret
Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as
kafka_conf
.Values passed to this parameter are not logged or stored in system tables.
fail_on_conf_parse_error
Determines whether the function fails when
kafka_conf
contains incorrectly formatted options and values, or invalid configuration properties.Default Value: FALSE
For accepted option and value formats, see Directly setting Kafka library options.
For a list of valid configuration properties, see the rdkafka GitHub repository.
Example
The following example converts each row from the iot_report table into a JSON object, and exports it to a Kafka topic named iot-report.
The iot_report table contains the following columns and data:
=> SELECT * FROM iot_report;
server | date | location | id
--------+---------------------+-------------------------+----------
1 | 2016-10-11 04:09:28 | -14.86058, 112.75848 | 70982027
1 | 2017-07-02 12:37:48 | -21.42197, -127.17672 | 49494918
2 | 2017-01-20 03:05:24 | 37.17372, 136.14026 | 36670100
2 | 2017-07-29 11:38:37 | -38.99517, 171.72671 | 52049116
1 | 2017-10-19 14:04:33 | -71.72156, -36.27381 | 94328189
(5 rows)
To build the KafkaExport function, provide the following values to define the Kafka message:
-
partitionColumn
: Use theserver
column. Because theserver
column values are one-based and Kafka partitions are zero-based, subtract 1 from theserver
value. -
keyColumn
: Use theid
column. This requires that you explicitly cast theid
value to a VARCHAR type. -
valueColumn
: Each message formats the date and location columns as the key/value pairs in the exported JSON object.To convert these rows to JSON format, use the ROW function to convert the date and location columns to structured data. Then, pass the ROW function to the TO_JSON function to encode the data structure as a JSON object.
Complete the remaining required function arguments and execute the KafkaExport function. If it succeeds, it returns 0 rows:
=> SELECT KafkaExport(server - 1, id::VARCHAR, TO_JSON(ROW(date, location))
USING PARAMETERS brokers='broker01:9092,broker02:9092',
topic='iot-results')
OVER (PARTITION BEST)
FROM iot_report;
partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)
Use kafkacat to verify that the consumer contains the JSON-formatted data in the iot-results topic:
$ /opt/vertica/packages/kafka/bin/kafkacat -C -t iot-results -b broker01:9092,broker02:9092
{"date":"2017-01-20 03:05:24","location":" 37.17372, 136.14026 "}
{"date":"2017-07-29 11:38:37","location":" -38.99517, 171.72671 "}
{"date":"2016-10-11 04:09:28","location":" -14.86058, 112.75848 "}
{"date":"2017-10-19 14:04:33","location":" -71.72156, -36.27381 "}
{"date":"2017-07-02 12:37:48","location":" -21.42197, -127.17672 "}
See also
Producing data using KafkaExport4 - KafkaJSONParser
The KafkaJSONParser parses JSON-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.
Syntax
KafkaJSONParser(
[enforce_length=Boolean]
[, flatten_maps=Boolean]
[, flatten_arrays=Boolean]
[, start_point=string]
[, start_point_occurrence=integer]
[, omit_empty_keys=Boolean]
[, reject_on_duplicate=Boolean]
[, reject_on_materialized_type_error=Boolean]
[, reject_on_empty_key=Boolean]
[, key_separator=char]
[, suppress_nonalphanumeric_key_chars=Boolean]
)
enforce_length
- If set to TRUE, rejects the row if data being loaded is too wide to fit into its column. Defaults to FALSE, which truncates any data that is too wide to fit into its column.
flatten_maps
- If set to TRUE, flattens all JSON maps.
flatten_arrays
- If set to TRUE, flattens JSON arrays.
start_point
- Specifies the key in the JSON data that the parser should parse. The parser only extracts data that is within the value associated with the
start_point
key. It parses the values of all instances of thestart_point
key within the data. start_point_occurrence
- Integer value indicating which the occurrence of the key specified by the
start_point
parameter where the parser should begin parsing. For example, if you set this value to 4, the parser will only begin loading data from the fifth occurrence of thestart_point
key. Only has an effect if you also supply thestart_point
parameter. omit_empty_keys
- If set to TRUE, omits any key from the load data that does not have a value set.
reject_on_duplicate
- If set to TRUE, rejects the row that contains duplicate key names. Key names are case-insensitive, so the keys "mykey" and "MyKey" are considered duplicates.
reject_on_materialized_type_error
- If set to TRUE, rejects the row if the data includes keys matching an existing materialized column and has a key that cannot be mapped into the materialized column's data type.
reject_on_empty_key
- If set to TRUE, rejects any row containing a key without a value.
key_separator
- A single character to use as the separator between key values instead of the default period (
.
) character. suppress_nonalphanumeric_key_chars
- If set to TRUE, replaces all non-alphanumeric characters in JSON key values with an underscore (_) character.
See JSON data for more information.
The following example demonstrates loading JSON data from Kafka. The parameters in the statement define to the load to:
-
Load data into the pre-existing table named logs.
-
The KafkaSource streams the data from a single partition in the source called server_log.
-
The Kafka broker for the data load is running on the host named kafka01 on port 9092.
-
KafkaSource stops loading data after either 10 seconds or on reaching the end of the stream, whichever happens first.
-
The KafkJSONParser flattens any arrays or maps in the JSON data.
=> COPY logs SOURCE KafkaSource(stream='server_log|0|0',
stop_on_eof=true,
duration=interval '10 seconds',
brokers='kafka01:9092')
PARSER KafkaJSONParser(flatten_arrays=True, flatten_maps=True);
5 - KafkaListManyTopics
Retrieves information about all topics from a Kafka broker. This function lists all of the topics defined in the Kafka cluster as well the number of partitions it contains and which brokers serve the topic.
Syntax
KafkaListManyTopics('broker:port[;...]'
[USING PARAMETERS
[kafka_conf='kafka_configuration_setting'
[, timeout=timeout_sec]])
broker
- The hostname (or ip address) of a broker in the Kafka cluster
port
- The port number on which the broker is running.
kafka_conf
A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.
For details, see Directly setting Kafka library options.
kafka_conf_secret
Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as
kafka_conf
.Values passed to this parameter are not logged or stored in system tables.
timeout
Integer number of seconds to wait for a response from the Kafka cluster.
Example
=> \x
Expanded display is on.
=> SELECT KafkaListManyTopics('kafka01.example.com:9092')
OVER (PARTITION AUTO);
-[ RECORD 1 ]--+--------------------------------------------------
brokers | kafka01.example.com:9092,kafka02.example.com:9092
topic | __consumer_offsets
num_partitions | 50
-[ RECORD 2 ]--+--------------------------------------------------
brokers | kafka01.example.com:9092,kafka02.example.com:9092
topic | iot_data
num_partitions | 1
-[ RECORD 3 ]--+--------------------------------------------------
brokers | kafka01.example.com:9092,kafka02.example.com:9092
topic | test
num_partitions | 1
6 - KafkaListTopics
Gets the list of topics available from a Kafka broker.
Syntax
KafkaListTopics(USING PARAMETERS brokers='hostname:port[,hostname2:port2...]'
[, kafka_conf='kafka_configuration_setting']
[, timeout=timeout_sec])
brokers
- The host name and port number of the broker to query for a topic list. You can supply more than one broker as a comma-separated list. However, the returned list will only contains the topics served by the last broker in the list.
kafka_conf
A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.
For details, see Directly setting Kafka library options.
kafka_conf_secret
Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as
kafka_conf
.Values passed to this parameter are not logged or stored in system tables.
timeout
Integer number of seconds to wait for a response from the Kafka cluster.
Example
=> SELECT KafkaListTopics(USING PARAMETERS brokers='kafka1-01.example.com:9092')
OVER ();
topic | num_partitions
-----------------------+----------------
test | 1
iot_data | 1
__consumer_offsets | 50
vertica_notifications | 1
web_hits | 1
(5 rows)
7 - KafkaOffsets
The KafkaOffsets user-defined transform function returns load operation statistics generated by the most recent invocation of KafkaSource. Query KafkaOffsets to see the metadata produced by your most recent load operation. You can query KafkaOffsets after each KafkaSource invocation to view information about that load. If you are using the scheduler, you can also view historical load information in the stream_microbatch_history table.
For each load operation, KafkaOffsets returns the following:
-
source kafka topic
-
source kafka partition
-
starting offset
-
ending offset
-
number of messages loaded
-
number of bytes read
-
duration of the load operation
-
end message
-
end reason
The following example demonstrates calling KafkaOffsets to show partition information that was loaded using KafkaSource:
=> SELECT kpartition, start_offset, end_offset, msg_count, ending FROM (select KafkaOffsets() over()) AS stats ORDER BY kpartition;
kpartition | start_offset | end_offset | msg_count | ending
------------+--------------+------------+-----------+------------
0 | -2 | 9999 | 1068 | END_OFFSET
The output shows that KafkaSource loaded 1068 messages (rows) from Kafka in a single partition. The KafkaSource ended the data load because it reached the ending offset.
Note
The values shown in the start_offset column are exclusive (the message with the shown offset was not loaded) and the values in the end_offset column are inclusive (the message with the shown offset was loaded). This is the opposite of the values specified in the KafkaSource'sstream
parameter. The difference between the inclusiveness of KafkaSource's and KafkaOffset's start and end offsets are based on the needs of the job scheduler. KafkaOffset is primarily intended for the job scheduler's use, so the start and end offset values are defined so the scheduler can easily start streaming from where left off.
8 - KafkaParser
The KafkaParser does not parse data loaded from Kafka. Instead, it passes the messages through as LONG VARCHAR values. Use this parser when you want to load raw Kafka messages into Vertica for further processing. You can use this parser as a catch-all for unsupported formats.
KafkaParser does not take any parameters.
Example
The following example loads raw messages from a Kafka topic named iot-data into a table named raw_iot.
=> CREATE TABLE raw_iot(message LONG VARCHAR);
CREATE TABLE
=> COPY raw_iot SOURCE KafkaSource(stream='iot-data|0|-2,iot-data|1|-2,iot-data|2|-2',
brokers='docd01:6667,docd03:6667', stop_on_eof=TRUE)
PARSER KafkaParser();
Rows Loaded
-------------
5000
(1 row)
=> select * from raw_iot limit 10;
message
------------------------------------
10039, 59, -68.951406, -19.270126
10042, 40, -82.688712, 4.7187705
10054, 6, -153.805268, -10.5173935
10054, 71, -135.613150, 58.286458
10081, 44, 130.288419, -77.344405
10104, -5, 77.882598, -56.600744
10132, 87, 103.530616, -69.672863
10135, 6, -121.420382, 15.3229855
10166, 77, -179.592211, 42.0477075
10183, 62, 17.225394, -55.6644765
(10 rows)
9 - KafkaSource
The KafkaSource UDL accesses data from a Kafka cluster. All Kafka parsers must use KafkaSource. Messages processed by KafkaSource must be at least one byte in length. KafkaSource writes an error message to vertica.log for zero-length messages.
The output of KafkaSource does not work directly with any of the non-Kafka parsers in Vertica (such as the FCSVPARSER). The KafkaParser produces additional metadata about the stream that parsers need to use in order to correctly parse the data. You must use filters such as KafkaInsertDelimiters to transform the data into a format that can be processed by other parsers. See Parsing custom formats for more an example.
You can cancel a running KafkaSource data load by using a close session function such as CLOSE_ALL_SESSIONS.
Syntax
KafkaSource(stream='topic_name|partition|start_offset[|end_offset]'[, param=value [,...] ] )
stream
- Required. Defines the data to be loaded as a comma-separated list of one or more partitions. Each partition is defined by three required values and one optional value separated by pipe characters (|) :
-
topic_name
: the name of the Kafka topic to load data from. You can read from different Kafka topics in the same stream parameter, with some limitations. See Loading from Multiple Topics in the Same Stream Parameter below for more information. -
partition
: the partition in the Kafka topic to copy. -
start_offset
: the offset in the Kafka topic where the load will begin. This offset is inclusive (the message with the offsetstart_offset
is loaded). See Special Starting Offset Values below for additional options. -
end_offset
: the optional offset where the load should end. This offset is exclusive (the message with the offsetend_offset
will not be loaded).
To end a load usingend_offset
, you must supply an ending offset value for all partitions in the stream parameter. Attempting to set an ending offset for some partitions and not set offset values for others results in an error.
If you do not specify an ending offset, you must supply at least one other ending condition usingstop_on_eof
orduration
.
-
brokers
- A comma-separated list of host:port pairs of the brokers in the Kafka cluster. Vertica recommends running Kafka on a different machine than Vertica.
Default: localhost:9092
duration
- An INTERVAL that specifies the duration of the frame. After this specified amount of time, KafkaSource terminates the COPY statements. If this parameter is not set, you must set at least one other ending condition by using stop_on_eof or specify an ending offset instead. See Duration Note below for more information.
executionparallelism
- The number of threads to use when loading data. Normally, you set this to an integer value between 1 and the number of partitions the node is loading from. Setting this parameter to a reduced value limits the number of threads used to process any COPY statement. It also increases the throughput of short queries issued in the pool, especially if the queries are executed concurrently.
If you do not specify this parameter, Vertica automatically creates a thread for every partition, up to the limit allowed by the resource pool.
If the value you specify for the KafkaSource is lower than the value specified for the scheduler resource pool, the KafkaSource value applies. This value cannot exceed the value specified for the scheduler's resource pool.
stop_on_eof
- Determines whether KafkaSource should terminate the COPY statement after it reaches the end of a file. If this value is not set, you must set at least one other ending condition using
duration
or by supplying ending offsets instead.Default: FALSE
group_id
The name of the Kafka consumer group to which Vertica reports its progress consuming messages. Set this value to disable progress reports to a Kafka consumer group. For details, see Monitoring Vertica message consumption with consumer groups.
Default:
vertica_
database-name
kafka_conf
A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets global configuration properties that are not available through the Vertica integration with Kafka.
For details, see Directly setting Kafka library options.
kafka_topic_conf
A JSON string of property/value pairs to pass directly to the rdkafka, the library that Vertica uses to communicate with Kafka. This parameter directly sets topic-level configuration properties that are not available through the Vertica integration with Kafka.
For details, see Directly setting Kafka library options.
kafka_conf_secret
Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as
kafka_conf
.Values passed to this parameter are not logged or stored in system tables.
fail_on_conf_parse_error
Determines whether the function fails when
kafka_conf
contains incorrectly formatted options and values, or invalid configuration properties.Default Value: FALSE
For accepted option and value formats, see Directly setting Kafka library options.
For a list of valid configuration properties, see the rdkafka GitHub repository.
Special starting offset values
The start_offset
portion of the stream
parameter lets you start loading messages from a specific point in the topic's partition. It also accepts one of two special offset values:
-
-2 tells KafkaSource to start loading at the earliest available message in the topic's partition. This value is useful when you want to load as many messages as you can from the Kafka topic's partition.
-
-3 tells KafkaSource to start loading from the consumer group's saved offset. If the consumer group does not have a saved offset, it starts loading from the earliest available message in the topic partition. See Monitoring Vertica message consumption with consumer groups for more information.
Loading from multiple topics in the same stream parameter
You can load from multiple Kafka topics in a single stream parameter as long as you follow these guidelines:
-
The data for the topics must be in the same format because you pass the data from KafkaSource to a single parser. For example, you cannot load data from one topic that is in Avro format and another in JSON format.
-
Similarly, you need to be careful if you are loading Avro data and specifying an external schema from a registry. The Avro parser accepts a single schema per data load. If the data from the separate topics have different schemas, then all of the data from one of the topics will be rejected.
-
The data in the different topics should have the same (or very similar) schemas, especially if you are loading data into a traditional Vertica table. While you can load data with different schemas into a flex table, there are only a few scenarios where it makes sense to combine dissimilar data into a single table.
Duration note
The duration
parameter applies to the length of time that Vertica allows the KafkaSource function to run. It usually reflects the amount of time the overall load statement takes. However, if KafkaSource is loading a large volume of data or the data needs extensive processing and parsing, the overall runtime of the query can exceed the amount of time specified in duration
.
Example
The following example demonstrates calling KafkaSource to load data from Kafka into an existing flex table named web_table with the following options:
-
The stream is named web_hits which has a single partition.
-
The load starts at the earliest message in the stream (identified by passing -2 as the start offset).
-
The load ends when it reaches the message with offset 1000.
-
The Kafka cluster's brokers are kafka01 and kafka03 in the example.com domain.
-
The brokers are listening on port 9092.
-
The load ends if it reaches the end of the stream before reaching the message with offset 1000. If you do not supply this option, the connector waits until Kafka sends a message with offset 1000.
-
The loaded data is sent to the KafkaJSONParser for processing.
=> COPY web_table
SOURCE KafkaSource(stream='web_hits|0|-2|1000',
brokers='kafka01.example.com:9092,kafka03.example.com:9092',
stop_on_eof=true)
PARSER KafkaJSONParser();
Rows Loaded
-------------
1000
(1 row)
To view details about this load operation, query KafkaOffsets. KafkaOffsets returns metadata about the messages that Vertica consumed from Kafka during the most recent KafkaSource invocation:
=> SELECT KafkaOffsets() OVER();
ktopic | kpartition | start_offset | end_offset | msg_count | bytes_read | duration | ending | end_msg
----------+------------+--------------+------------+-----------+------------+-----------------+------------+-------------------
web_hits | 0 | 0 | 999 | 1000 | 197027 | 00:00:00.385365 | END_OFFSET | Last message read
(1 row)
The msg_count
column verifies that Vertica loaded 1000 messages, and the ending
column indicates that Vertica stopped consuming messages when it reached the message with the offset 1000.
10 - KafkaTopicDetails
Retrieves information about the specified topic from one or more Kafka brokers. This function lists details about the topic partitions, and the Kafka brokers that serve each partition in the Kafka cluster.
Syntax
KafkaTopicDetails(USING PARAMETERS brokers='hostname:port[,hostname2:port2...]'
, topic=topic_name
[, kafka_conf='option=value[;option2=value2...]']
[, timeout=timeout_sec])
- brokers
- The hostname (or IP address) of a broker in the Kafka cluster.
- port
- The port number on which the broker is running.
topic
- Kafka topic that you want details for.
kafka_conf
- A semicolon-delimited list of option=value pairs to pass directly to the rdkafka library. This is the library Vertica uses to communicate with Kafka. You can use this parameter to directly set configuration options that are not available through the Vertica integration with Kafka. See Directly setting Kafka library options for details.
kafka_conf_secret
Conceals sensitive configuration data that you must pass directly to the rdkafka library, such as passwords. This parameter accepts settings in the same format as
kafka_conf
.Values passed to this parameter are not logged or stored in system tables.
timeout
- Integer number of seconds to wait for a response from the Kafka cluster.
Example
=> SELECT KafkaTopicDetails(USING PARAMETERS brokers='kafka1-01.example.com:9092',topic='iot_data') OVER();
partition_id | lead_broker | replica_brokers | in_sync_replica_brokers
--------------+-------------+-----------------+-------------------------
0 | 0 | 0 | 0
1 | 1 | 1 | 1
2 | 0 | 0 | 0
3 | 1 | 1 | 1
4 | 0 | 0 | 0
(5 rows)