Both Vertica and Kafka have settings you can use to optimize your streaming data loads. The topics in this section explain these settings.
This is the multi-page printable view of this section. Click here to print.
Configuring Vertica and Kafka
- 1: Kafka and Vertica configuration settings
- 2: Directly setting Kafka library options
- 3: Configuring Vertica for Apache Kafka version 0.9 and earlier
1 - Kafka and Vertica configuration settings
You can fine tune configuration settings for Vertica and Kafka that optimize performance.
The Vertica and Kafka integration uses rdkafka version 2.4.0. Unless otherwise noted, the following configuration settings use the default librdkafka configuration properties values. For instructions on how to override the default values, see Directly setting Kafka library options.
Vertica producer settings
These settings change how Vertica produces data for Kafka with the KafkaExport function and notifiers.
queue.buffering.max.messages
- Size of the Vertica producer queue. If Vertica generates too many messages too quickly, the queue can fill, resulting in dropped messages. Increasing this value consumes more memory, but reduces the chance of lost messages.
Defaults:
-
KafkaExport: 1000
-
Notifiers: 10000
-
queue.buffering.max.ms
- Frequency with which Vertica flushes the producer message queue. Lower values decrease latency at the cost of throughput. Higher values increase throughput, but can cause the producer queue (set by queue.buffering.max.messages) to fill more frequently, resulting in dropped messages.
Default: 100 ms
message.max.bytes
- Maximum size of a Kafka protocol request message batch. This value should be the same on your sources, brokers, and producers.
message.send.max.retries
- Number of attempts the producer makes to deliver the message to a broker. Higher values increase the chance of success.
retry.backoff.ms
- Interval Vertica waits before resending a failed message.
request.required.acks
- Number of broker replica acknowledgments Kafka requires before it considers message delivery successful. Requiring acknowledgments increases latency. Removing acknowledgments increases the risk of message loss.
request.timeout.ms
- Interval that the producer waits for a response from the broker. Broker response time is affected by server load and the number of message acknowledgments you require. Higher values increase latency.
compression.type
- Compression algorithm used to encode data before sending it to a broker. Compression helps to reduce the network footprint of your Vertica producers and increase disk utilization. Vertica supports
gzip
andsnappy
.
Kafka broker settings
Kafka brokers receive messages from producers and distribute them among Kafka consumers. Configure these settings on the brokers themselves. These settings function independently of your producer and consumer settings. For detailed information on Apache Kafka broker settings, refer to the Apache Kafka documentation.
message.max.bytes
- Maximum size of a Kafka protocol request message batch.This value should be the same on your sources, brokers, and producers.
num.io.threads
- Number of network threads the broker uses to receive and process requests. More threads can increase your concurrency.
num.network.threads
- Number of network threads the broker uses to accept network requests. More threads can increase your concurrency.
Vertica consumer settings
The following setting changes how Vertica acts when it consumes data from Kafka. You can set this value using the kafka_conf parameter on the KafkaSource UDL when directly executing a COPY statement. For schedulers, use the --message_max_bytes
settings in the scheduler tool.
message.max.bytes
- Maximum size of a Kafka protocol request message batch. Set this value to a high enough value to prevent the overhead of fetching batches of messages interfering with loading data. Defaults to 24MB for newly-created load specs.
2 - Directly setting Kafka library options
Vertica relies on the open source rdkafka library to communicate with Apache Kafka. This library contains many configuration properties that control how Vertica and Kafka interact. You set the most common rdkafka library properties through the settings in the vkconfig utility and the Kafka integration functions.
There are some rdkafka properties that you cannot directly set from within the Vertica. Under normal circumstances, you do not need to change them. However, if you find that you need to set a specific rdkafka property that is not directly available from Vertica, you can set the property directly through one of the following options or parameters:
kafka_conf
: sets global configuration options. This is available as a vkconfig option and a function parameter.kafka_topic_conf
: sets topic-level configuration options. This is available as a parameter in the KafkaExport and KafkaSource functions.kafka_conf_secret
: sets options that accept sensitive information, such as passwords. This is available as a vkconfig option and a function parameter. Values passed tokafka_conf_secret
are not logged or stored in system tables.
For a list of global and topic rdkafka configuration properties, see the rdkafka repository on GitHub.
Important
Arbitrarily setting configuration properties might result in errors or unpredictable behavior. If you encounter a problem loading messages after setting an rdkafka property using the configuration options that Vertica provides, roll back your change to see if that was the source of the problem.
To prevent confusion, never use one of the kafka_*
options or parameters to set an rdkafka library property if you can set the property with an existing scheduler option. For example, do not use the kafka_conf
option to set the Kafka message.max.bytes
setting. Instead, use the load-spec tool --message-max-bytes
option.
Setting vkconfig properties
The kafka_conf
option sets rdkafka properites when you automatically consume data from Kafka with the vkconfig scheduler. You can set its values in the following ways, listed in descending order of precedence:
- The
--kafka_conf
option of the vkconfig utility. This option can be set in the cluster, source, launch, and sync tools. Note that the setting only applies to each vkconfig utility call—it does persist between not carry over to other vkconfig utility calls. For example, if you need to supply an option to the cluster and source tool, you must supply thekafka_conf
option to both of them. - The Linux environment variable
VERTICA_RDKAFKA_CONF_
KAFKA_CLUSTER set on the host where you run the vkconfig utility. TheKAFKA_CLUSTER
portion of the variable name is the name of a Kafka cluster you have defined using vkconfig's cluster utility. The settings in this environment variable only affect the specific Kafka cluster you name inKAFKA_CLUSTER
. - The Linux environment variable
VERTICA_RDKAFKA_CONF
set on the host where you run the vkconfig utility.
Note
Setting an rdkafka property with an environment variable helps to keep your settings consistent. It is easy to forget to set the--kafka_conf
option for each call to the vkconfig script.
All of these options cascade, so setting an option using the --kafka_conf
argument to the cluster tool overrides the same option that was set in the environment variables.
Option and parameter formats
All kafka_*
options and parameters accept a JSON string. You can pass configuration property settings in one of the following formats:
-
One or more property/value pairs:
# vkconfig scheduler option --kafka_conf '{"option1":value1[, "option2":value2...]}' # function parameter kafka_conf='{"option1":value1[, "option2":value2...]}'
-
A single property with multiple values:
# vkconfig scheduler option --kafka_conf '{"option1":"value1[;value2...]"}' # function parameter kafka_conf='{"option1":"value1[;value2...]"}'
Examples
The following example demonstrates disabling rdkafka's api.version.request
property when manually loading messages using KafkaSource. You should always disable this property when accessing Kafka cluster running version 0.9 or earlier. See Configuring Vertica for Apache Kafka version 0.9 and earlier for more information.
=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
brokers='kafka-01.example.com:9092',
stop_on_eof=True,
kafka_conf='{"api.version.request":false}')
PARSER KafkaJSONParser();
Rows Loaded
-------------
5000
(1 row)
This example demonstrates setting two properties with a JSON string when calling the cluster tool. It disables the api.version.request
property and enables CRC checks of messages from Kafka using the check.crcs
property:
$ vkconfig cluster --create --cluster StreamCluster1 \
--hosts kafka01.example.com:9092,kafka02.example.com:9092 \
--conf myscheduler.config \
--kafka_conf '{"api.version.request":false, "check.crcs":true}'
The following example demonstrates setting the same properties using an environment variable:
$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false, "check.crcs":true}'
$ vkconfig cluster --create --cluster StreamCluster1 \
--hosts kafka01.example.com:9092,kafka02.example.com:9092 \
--conf myscheduler.config
Important
Setting thecheck.crc
property is just an example. Vertica does not suggest you enable the CRC check in your schedulers under normal circumstances. It adds additional overhead and can result in slower performance.
3 - Configuring Vertica for Apache Kafka version 0.9 and earlier
Apache Kafka version 0.10 introduced a new feature that allows consumers to determine which version of the Kafka API the Kafka brokers support. Consumers that support this feature send an initial API version query to the Kafka broker to determine the API version to use when communicating with them. Kafka brokers running version 0.9.0 or earlier cannot respond to the API query. If the consumer does not receive a reply from the Kafka broker within a timeout period (set to 10 seconds by default) the consumer can assume the Kafka broker is running Kafka version 0.9.0 or earlier.
The Vertica integration with Kafka supports this API query feature starting in version 9.1.1. This API check can cause problems if you are connecting Vertica to a Kafka cluster running 0.9.0 or earlier. You may notice poorer performance loading messages from Kafka and may experience errors as the 10 second API request timeout can cause parts of the Kafka integration feature to time out and report errors.
For example, if you run the vkconfig source utility to configure a source on a Kafka 0.9 cluster, you may get the following error:
$ vkconfig source --create --conf weblog.conf --cluster kafka_weblog --source web_hits
Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException:
ERROR: [[Vertica][VJDBC](5861) ERROR: Error calling processPartition() in
User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]]
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:184)
at com.vertica.solutions.kafka.model.StreamSource.setFromMapAndValidate(StreamSource.java:130)
at com.vertica.solutions.kafka.model.StreamModel.<init>(StreamModel.java:89)
at com.vertica.solutions.kafka.model.StreamSource.<init>(StreamSource.java:39)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:53)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:15)
at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:56)
at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:132)
at com.vertica.solutions.kafka.cli.SourceCLI.main(SourceCLI.java:25)
Caused by: java.sql.SQLNonTransientException: [Vertica][VJDBC](5861)
ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
. . .
Disabling the API version request when using the streaming job scheduler
To avoid these problems, you must disable the API version request feature in the rdkafka library that Vertica uses to communicate with Kafka. The easiest way to disable this setting when using the scheduler is to set a Linux environment variable on the host on which you run the vkconfig script. Using the environment variable ensures that each call to vkconfig includes the setting to disable the API version request. The vkconfig script checks two variables:
-
VERTICA_RDKAFKA_CONF
applies to all Kafka clusters that the scheduler running on the host communicates with. -
VERTICA_RDKAFKA_CONF_
CLUSTER_NAME
applies to just the Kafka cluster namedCLUSTER_NAME
. Use this variable if your scheduler communicates with several Kafka clusters, some of which are not running version 0.9 or earlier.
If you set both variables, the cluster-specific one takes precedence. This feature is useful if most of the clusters your scheduler connects to are running Kafka 0.9 or earlier, but a few run 0.10 or later. This case, you can disable the API version check for most clusters using the VERTICA_RDKAFKA_CONF
variable and re-enable the check for specific clusters using VERTICA_RDKAFKA_CONF_
CLUSTER_NAME
.
If just a few of your Kafka clusters run version 0.9 or earlier, you can just set the cluster-specific variables for them, and leave the default values in place for the majority of your clusters.
The contents of the environment variable is a JSON object that tells the rdkafka library whether to query the Kafka cluster for the API version it supports:
'{"api.version.request":false}'
To set the environment variable in BASH, use the export command:
$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false}'
If you wanted the setting to just affect a cluster named kafka_weblog, the command is:
$ export VERTICA_RDKAFKA_CONF_kafka_weblog='{"api.version.request":false}'
You can add the command to any of the common user environment configuration files such as ~/.bash_profile
or the system-wide files such as /etc/profile
. You must ensure that the same setting is used for all users who may run the vkconfig script, including any calls made by daemon processes such as init. You can also directly include this command in any script you use to set configure or start your scheduler.
Disabling the API version request when directly loading messages
You can disable the API request when you directly call KafkaSource to load messages by using the kafka_conf parameter:
=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
brokers='kafka-01.example.com:9092',
stop_on_eof=True,
kafka_conf='{"api.version.request":false}')
PARSER KafkaJSONParser();
Rows Loaded
-------------
5000
(1 row)