This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Configuring Vertica and Kafka

Both Vertica and Kafka have settings you can use to optimize your streaming data loads.

Both Vertica and Kafka have settings you can use to optimize your streaming data loads. The topics in this section explain these settings.

1 - Kafka and Vertica configuration settings

You can fine-tune configuration settings for Vertica and Kafka that optimize performance.

You can fine-tune configuration settings for Vertica and Kafka that optimize performance.

The Vertica and Kafka integration uses rdkafka version 0.11.6. Unless otherwise noted, the following configuration settings use the default librdkafka configuration properties values. For instructions on how to override the default values, see Directly setting Kafka library options.

Vertica producer settings

These settings change how Vertica produces data for Kafka with the KafkaExport function and notifiers.

queue.buffering.max.messages
Size of the Vertica producer queue. If Vertica generates too many messages too quickly, the queue can fill, resulting in dropped messages. Increasing this value consumes more memory, but reduces the chance of lost messages.

Defaults:

  • KafkaExport: 1000

  • Notifiers: 10000

queue.buffering.max.ms
Frequency with which Vertica flushes the producer message queue. Lower values decrease latency at the cost of throughput. Higher values increase throughput, but can cause the producer queue (set by queue.buffering.max.messages) to fill more frequently, resulting in dropped messages.

Default: 100 ms

message.max.bytes
Maximum size of a Kafka protocol request message batch. This value should be the same on your sources, brokers, and producers.
message.send.max.retries
Number of attempts the producer makes to deliver the message to a broker. Higher values increase the chance of success.
retry.backoff.ms
Interval Vertica waits before resending a failed message.
request.required.acks
Number of broker replica acknowledgments Kafka requires before it considers message delivery successful. Requiring acknowledgments increases latency. Removing acknowledgments increases the risk of message loss.
request.timeout.ms
Interval that the producer waits for a response from the broker. Broker response time is affected by server load and the number of message acknowledgments you require.
Higher values increase latency.
compression.type
Compression algorithm used to encode data before sending it to a broker. Compression helps to reduce the network footprint of your Vertica producers and increase disk utilization. Vertica supports gzip and snappy.

Kafka broker settings

Kafka brokers receive messages from producers and distribute them among Kafka consumers. Configure these settings on the brokers themselves. These settings function independently of your producer and consumer settings. For detailed information on Apache Kafka broker settings, refer to the Apache Kafka documentation.

message.max.bytes
Maximum size of a Kafka protocol request message batch.This value should be the same on your sources, brokers, and producers.
num.io.threads
Number of network threads the broker uses to receive and process requests. More threads can increase your concurrency.
num.network.threads
Number of network threads the broker uses to accept network requests. More threads can increase your concurrency.

Vertica consumer settings

The following settings changes how Vertica acts when it consumes data from Kafka. You can set this value using the kafka_conf parameter on the KafkaSource UDL when directly executing a COPY statement. For schedulers, use the --message_max_bytes settings in the scheduler tool.

message.max.bytes
Maximum size of a Kafka protocol request message batch. Set this value to a high enough value to prevent the overhead of fetching batches of messages interfering with loading data. Defaults to 24MB for newly-created load specs.

2 - Directly setting Kafka library options

Vertica relies on the open source rdkafka library to communicate with Apache Kafka.

Vertica relies on the open source rdkafka library to communicate with Apache Kafka. This library contains many configuration properties that control how Vertica and Kafka interact. You set the most common rdkafka library properties through the settings in the vkconfig utility and the Kafka integration functions.

There are some rdkafka properties that you cannot directly set from within the Vertica. Under normal circumstances, you do not need to change them. However, if you find that you need to set a specific rdkafka property that is not directly available from Vertica, you can set the property directly through one of the following options or parameters:

  • kafka_conf: sets global configuration options. This is available as a vkconfig option and a function parameter.
  • kafka_topic_conf: sets topic-level configuration options. This is available as a parameter in the KafkaExport and KafkaSource functions.
  • kafka_conf_secret: sets options that accept sensitive information, such as passwords. This is available as a vkconfig option and a function parameter. Values passed to kafka_conf_secret are not logged or stored in system tables.

For a list of global and topic rdkafka configuration properties, see the rdkafka repository on GitHub.

Setting vkconfig properties

The kafka_conf option sets rdkafka properites when you automatically consume data from Kafka with the vkconfig scheduler. You can set its values in the following ways, listed in descending order of precedence:

  1. The --kafka_conf option of the vkconfig utility. This option can be set in the cluster, source, launch, and sync tools. Note that the setting only applies to each vkconfig utility call—it does persist between not carry over to other vkconfig utility calls. For example, if you need to supply an option to the cluster and source tool, you must supply the kafka_conf option to both of them.
  2. The Linux environment variable VERTICA_RDKAFKA_CONF_KAFKA_CLUSTER set on the host where you run the vkconfig utility. The KAFKA_CLUSTER portion of the variable name is the name of a Kafka cluster you have defined using vkconfig's cluster utility. The settings in this environment variable only affect the specific Kafka cluster you name in KAFKA_CLUSTER.
  3. The Linux environment variable VERTICA_RDKAFKA_CONF set on the host where you run the vkconfig utility.

All of these options cascade, so setting an option using the --kafka_conf argument to the cluster tool overrides the same option that was set in the environment variables.

Option and parameter formats

All kafka_* options and parameters accept a JSON string. You can pass configuration property settings in one of the following formats:

  • One or more property/value pairs:

    # vkconfig scheduler option
    --kafka_conf '{"option1":value1[, "option2":value2...]}'
    
    # function parameter
    kafka_conf='{"option1":value1[, "option2":value2...]}'
    
  • A single property with multiple values:

    # vkconfig scheduler option
    --kafka_conf '{"option1":"value1[;value2...]"}'
    
    # function parameter
    kafka_conf='{"option1":"value1[;value2...]"}'
    

Examples

The following example demonstrates disabling rdkafka's api.version.request property when manually loading messages using KafkaSource. You should always disable this property when accessing Kafka cluster running version 0.9 or earlier. See Configuring Vertica for Apache Kafka version 0.9 and earlier for more information.

=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
                                           brokers='kafka-01.example.com:9092',
                                           stop_on_eof=True,
                                           kafka_conf='{"api.version.request":false}')
        PARSER KafkaJSONParser();
 Rows Loaded
-------------
        5000
(1 row)

This example demonstrates setting two properties with a JSON string when calling the cluster tool. It disables the api.version.request property and enables CRC checks of messages from Kafka using the check.crcs property:

$ vkconfig cluster --create --cluster StreamCluster1 \
                   --hosts kafka01.example.com:9092,kafka02.example.com:9092 \
                   --conf myscheduler.config \
                   --kafka_conf '{"api.version.request":false, "check.crcs":true}'

The following example demonstrates setting the same properties using an environment variable:

$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false, "check.crcs":true}'
$ vkconfig cluster --create --cluster StreamCluster1 \
                   --hosts kafka01.example.com:9092,kafka02.example.com:9092 \
                   --conf myscheduler.config

3 - Configuring Vertica for Apache Kafka version 0.9 and earlier

Apache Kafka version 0.10 introduced a new feature that allows consumers to determine which version of the Kafka API the Kafka brokers support.

Apache Kafka version 0.10 introduced a new feature that allows consumers to determine which version of the Kafka API the Kafka brokers support. Consumers that support this feature send an initial API version query to the Kafka broker to determine the API version to use when communicating with them. Kafka brokers running version 0.9.0 or earlier cannot respond to the API query. If the consumer does not receive a reply from the Kafka broker within a timeout period (set to 10 seconds by default) the consumer can assume the Kafka broker is running Kafka version 0.9.0 or earlier.

The Vertica integration with Kafka supports this API query feature starting in version 9.1.1. This API check can cause problems if you are connecting Vertica to a Kafka cluster running 0.9.0 or earlier. You may notice poorer performance loading messages from Kafka and may experience errors as the 10 second API request timeout can cause parts of the Kafka integration feature to time out and report errors.

For example, if you run the vkconfig source utility to configure a source on a Kafka 0.9 cluster, you may get the following error:

$ vkconfig source --create --conf weblog.conf --cluster kafka_weblog --source web_hits
Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException:
ERROR: [[Vertica][VJDBC](5861) ERROR: Error calling processPartition() in
User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]]
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:184)
at com.vertica.solutions.kafka.model.StreamSource.setFromMapAndValidate(StreamSource.java:130)
at com.vertica.solutions.kafka.model.StreamModel.<init>(StreamModel.java:89)
at com.vertica.solutions.kafka.model.StreamSource.<init>(StreamSource.java:39)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:53)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:15)
at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:56)
at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:132)
at com.vertica.solutions.kafka.cli.SourceCLI.main(SourceCLI.java:25)
Caused by: java.sql.SQLNonTransientException: [Vertica][VJDBC](5861)
ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
                           . . .

Disabling the API version request when using the streaming job scheduler

To avoid these problems, you must disable the API version request feature in the rdkafka library that Vertica uses to communicate with Kafka. The easiest way to disable this setting when using the scheduler is to set a Linux environment variable on the host on which you run the vkconfig script. Using the environment variable ensures that each call to vkconfig includes the setting to disable the API version request. The vkconfig script checks two variables:

  • VERTICA_RDKAFKA_CONF applies to all Kafka clusters that the scheduler running on the host communicates with.

  • VERTICA_RDKAFKA_CONF_CLUSTER_NAME applies to just the Kafka cluster named CLUSTER_NAME. Use this variable if your scheduler communicates with several Kafka clusters, some of which are not running version 0.9 or earlier.

If you set both variables, the cluster-specific one takes precedence. This feature is useful if most of the clusters your scheduler connects to are running Kafka 0.9 or earlier, but a few run 0.10 or later. This case, you can disable the API version check for most clusters using the VERTICA_RDKAFKA_CONF variable and re-enable the check for specific clusters using VERTICA_RDKAFKA_CONF_CLUSTER_NAME.

If just a few of your Kafka clusters run version 0.9 or earlier, you can just set the cluster-specific variables for them, and leave the default values in place for the majority of your clusters.

The contents of the environment variable is a JSON object that tells the rdkafka library whether to query the Kafka cluster for the API version it supports:

'{"api.version.request":false}'

To set the environment variable in BASH, use the export command:

$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false}'

If you wanted the setting to just affect a cluster named kafka_weblog, the command is:

$ export VERTICA_RDKAFKA_CONF_kafka_weblog='{"api.version.request":false}'

You can add the command to any of the common user environment configuration files such as ~/.bash_profile or the system-wide files such as /etc/profile. You must ensure that the same setting is used for all users who may run the vkconfig script, including any calls made by daemon processes such as init. You can also directly include this command in any script you use to set configure or start your scheduler.

Disabling the API version request when directly loading messages

You can disable the API request when you directly call KafkaSource to load messages by using the kafka_conf parameter:

=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
                                           brokers='kafka-01.example.com:9092',
                                           stop_on_eof=True,
                                           kafka_conf='{"api.version.request":false}')
        PARSER KafkaJSONParser();
 Rows Loaded
-------------
        5000
(1 row)

See also