Directly setting Kafka library options

Vertica relies on the open source rdkafka library to communicate with Apache Kafka.

Vertica relies on the open source rdkafka library to communicate with Apache Kafka. This library contains many options for controlling how Vertica and Kafka interact. You set the most common rdkafka library options through the settings in the vkconfig utility and the Kafka integration functions such as KafkaSource.

There are some rdkafka settings that cannot be directly set from within the Vertica. Under normal circumstances, you do not need to change them. However, if you find that you need to set a specific rdkafka setting that is not directly available from Vertica, you can directly pass options to the rdkafka library through the kafka_conf options.

The kafka_conf argument is supported when using a scheduler to load data from Kafka. You can set the values in the following ways (listed in order of lower to higher precedence):

  • The Linux environment variable VERTICA_RDKAFKA_CONF set on the host where you run the vkconfig utility.

  • The Linux environment variable VERTICA_RDKAFKA_CONF_KAFKA_CLUSTER set on the host where you run the vkconfig utility. The KAFKA_CLUSTER portion of the variable name is the name of a Kafka cluster you have defined using vkconfig's cluster utility. The settings in this environment variable only affect the specific Kafka cluster you name in KAFKA_CLUSTER.

  • The --kafka_conf option of the vkconfig utility. This option can be set in the cluster, source, launch, and sync tools. Note that the setting only applies to each vkconfig utility call—it does not carry over to other vkconfig utility calls. For example, if you need to supply an option to the cluster and source tool, you must supply the kafka_conf option to both of them.

All of these options cascade, so setting an option using the --kafka_conf argument to the cluster tool overrides the same option that was set in the environment variables.

You can also directly set rdkafka options when directly calling KafkaExport, KafkaSource, and several other Kafka integration functions. These functions accept a parameter named kafka_conf.

The kafka_conf option settings

The kafka_conf vkconfig option accepts a JSON object with settings in the following formats:

  • One or more option/value pairs:

    --kafka_conf '{"option1":value1[, "option2":value2...]}'
    
  • A single option with multiple values:

    --kafka_conf '{"option1":"value1[;value2...]"}'
    

Vertica provides the kafka_conf_secret parameter to pass sensitive configuration settings. This parameter accepts values in the same format as kafka_conf. Values passed to kafka_conf_secret are not logged or stored in system tables.

See the rdkafka project on GitHub for a list of the configuration options supported by the rdkafka library.

Example

The following example demonstrates disabling rdkafka's api.version.request option when manually loading messages using KafkaSource. You should always disable this option when accessing Kafka cluster running version 0.9 or earlier. See Configuring Vertica for Apache Kafka version 0.9 and earlier for more information.

=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
                                           brokers='kafka-01.example.com:9092',
                                           stop_on_eof=True,
                                           kafka_conf='{"api.version.request":false}')
        PARSER KafkaJSONParser();
 Rows Loaded
-------------
        5000
(1 row)

This example demonstrates setting two options with a JSON object when calling the cluster tool. It disables the api.version.request option and enables CRC checks of messages from Kafka using the check.crcs option:

$ vkconfig cluster --create --cluster StreamCluster1 \
                   --hosts kafka01.example.com:9092,kafka02.example.com:9092 \
                   --conf myscheduler.config \
                   --kafka_conf '{"api.version.request":false, "check.crcs":true}'

The following example demonstrates setting the same options using an environment variable:

$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false, "check.crcs":true}'
$ vkconfig cluster --create --cluster StreamCluster1 \
                   --hosts kafka01.example.com:9092,kafka02.example.com:9092 \
                   --conf myscheduler.config