Directly setting Kafka library options

Vertica relies on the open source rdkafka library to communicate with Apache Kafka.

Vertica relies on the open source rdkafka library to communicate with Apache Kafka. This library contains many configuration properties that control how Vertica and Kafka interact. You set the most common rdkafka library properties through the settings in the vkconfig utility and the Kafka integration functions.

There are some rdkafka properties that you cannot directly set from within the Vertica. Under normal circumstances, you do not need to change them. However, if you find that you need to set a specific rdkafka property that is not directly available from Vertica, you can set the property directly through one of the following options or parameters:

  • kafka_conf: sets global configuration options. This is available as a vkconfig option and a function parameter.
  • kafka_topic_conf: sets topic-level configuration options. This is available as a parameter in the KafkaExport and KafkaSource functions.
  • kafka_conf_secret: sets options that accept sensitive information, such as passwords. This is available as a vkconfig option and a function parameter. Values passed to kafka_conf_secret are not logged or stored in system tables.

For a list of global and topic rdkafka configuration properties, see the rdkafka repository on GitHub.

Setting vkconfig properties

The kafka_conf option sets rdkafka properites when you automatically consume data from Kafka with the vkconfig scheduler. You can set its values in the following ways, listed in descending order of precedence:

  1. The --kafka_conf option of the vkconfig utility. This option can be set in the cluster, source, launch, and sync tools. Note that the setting only applies to each vkconfig utility call—it does persist between not carry over to other vkconfig utility calls. For example, if you need to supply an option to the cluster and source tool, you must supply the kafka_conf option to both of them.
  2. The Linux environment variable VERTICA_RDKAFKA_CONF_KAFKA_CLUSTER set on the host where you run the vkconfig utility. The KAFKA_CLUSTER portion of the variable name is the name of a Kafka cluster you have defined using vkconfig's cluster utility. The settings in this environment variable only affect the specific Kafka cluster you name in KAFKA_CLUSTER.
  3. The Linux environment variable VERTICA_RDKAFKA_CONF set on the host where you run the vkconfig utility.

All of these options cascade, so setting an option using the --kafka_conf argument to the cluster tool overrides the same option that was set in the environment variables.

Option and parameter formats

All kafka_* options and parameters accept a JSON string. You can pass configuration property settings in one of the following formats:

  • One or more property/value pairs:

    # vkconfig scheduler option
    --kafka_conf '{"option1":value1[, "option2":value2...]}'
    
    # function parameter
    kafka_conf='{"option1":value1[, "option2":value2...]}'
    
  • A single property with multiple values:

    # vkconfig scheduler option
    --kafka_conf '{"option1":"value1[;value2...]"}'
    
    # function parameter
    kafka_conf='{"option1":"value1[;value2...]"}'
    

Examples

The following example demonstrates disabling rdkafka's api.version.request property when manually loading messages using KafkaSource. You should always disable this property when accessing Kafka cluster running version 0.9 or earlier. See Configuring Vertica for Apache Kafka version 0.9 and earlier for more information.

=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
                                           brokers='kafka-01.example.com:9092',
                                           stop_on_eof=True,
                                           kafka_conf='{"api.version.request":false}')
        PARSER KafkaJSONParser();
 Rows Loaded
-------------
        5000
(1 row)

This example demonstrates setting two properties with a JSON string when calling the cluster tool. It disables the api.version.request property and enables CRC checks of messages from Kafka using the check.crcs property:

$ vkconfig cluster --create --cluster StreamCluster1 \
                   --hosts kafka01.example.com:9092,kafka02.example.com:9092 \
                   --conf myscheduler.config \
                   --kafka_conf '{"api.version.request":false, "check.crcs":true}'

The following example demonstrates setting the same properties using an environment variable:

$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false, "check.crcs":true}'
$ vkconfig cluster --create --cluster StreamCluster1 \
                   --hosts kafka01.example.com:9092,kafka02.example.com:9092 \
                   --conf myscheduler.config