Both Vertica and Kafka have settings you can use to optimize your streaming data loads. The topics in this section explain these settings.
This is the multi-page printable view of this section. Click here to print.
Configuring Vertica and Kafka
- 1: Kafka and Vertica configuration settings
- 2: Directly setting Kafka library options
- 3: Configuring Vertica for Apache Kafka version 0.9 and earlier
- 4: Changes to the message.max.bytes setting in Kafka version 0.11 and later
1 - Kafka and Vertica configuration settings
You can fine-tune configuration settings for Vertica and Kafka that optimize performance.
The Vertica and Kafka integration uses rdkafka version 0.11.6. Unless otherwise noted, the following configuration settings use the default librdkafka configuration properties values. For instructions on how to override the default values, see Directly setting Kafka library options.
Vertica producer settings
These settings change how Vertica produces data for Kafka with the KafkaExport function and notifiers.
queue.buffering.max.messages
- Size of the Vertica producer queue. If Vertica generates too many messages too quickly, the queue can fill, resulting in dropped messages. Increasing this value consumes more memory, but reduces the chance of lost messages.
Defaults:
-
KafkaExport: 1000
-
Notifiers: 10000
-
queue.buffering.max.ms
- Frequency with which Vertica flushes the producer message queue. Lower values decrease latency at the cost of throughput. Higher values increase throughput, but can cause the producer queue (set by queue.buffering.max.messages) to fill more frequently, resulting in dropped messages.
Default: 100 ms
message.max.bytes
- Maximum size of a Kafka protocol request message batch. This value should be the same on your sources, brokers, and producers.
message.send.max.retries
- Number of attempts the producer makes to deliver the message to a broker. Higher values increase the chance of success.
retry.backoff.ms
- Interval Vertica waits before resending a failed message.
request.required.acks
- Number of broker replica acknowledgments Kafka requires before it considers message delivery successful. Requiring acknowledgments increases latency. Removing acknowledgments increases the risk of message loss.
request.timeout.ms
- Interval that the producer waits for a response from the broker. Broker response time is affected by server load and the number of message acknowledgments you require.
Higher values increase latency. compression.type
- Compression algorithm used to encode data before sending it to a broker. Compression helps to reduce the network footprint of your Vertica producers and increase disk utilization. Vertica supports
gzip
andsnappy
.
Kafka broker settings
Kafka brokers receive messages from producers and distribute them among Kafka consumers. Configure these settings on the brokers themselves. These settings function independently of your producer and consumer settings. For detailed information on Apache Kafka broker settings, refer to the Apache Kafka documentation.
message.max.bytes
- Maximum size of a Kafka protocol request message batch.This value should be the same on your sources, brokers, and producers.
num.io.threads
- Number of network threads the broker uses to receive and process requests. More threads can increase your concurrency.
num.network.threads
- Number of network threads the broker uses to accept network requests. More threads can increase your concurrency.
Vertica consumer settings
The following settings changes how Vertica acts when it consumes data from Kafka. You can set this value using the kafka_conf parameter on the KafkaSource UDL when directly executing a COPY statement. For schedulers, use the --message_max_bytes
settings in the scheduler tool.
message.max.bytes
- Maximum size of a Kafka protocol request message batch. Set this value to a high enough value to prevent the overhead of fetching batches of messages interfering with loading data. Defaults to 24MB for newly-created load specs.
Important
The meaning of this setting changed between Kafka version 0.10 and 0.11. If you have a scheduler that uses Vertica version 9.1.0 or earlier, see Changes to the message.max.bytes setting in Kafka version 0.11 and later for settings that you might need to change.
2 - Directly setting Kafka library options
Vertica relies on the open source rdkafka library to communicate with Apache Kafka. This library contains many options for controlling how Vertica and Kafka interact. You set the most common rdkafka library options through the settings in the vkconfig utility and the Kafka integration functions such as KafkaSource.
There are some rdkafka settings that cannot be directly set from within the Vertica. Under normal circumstances, you do not need to change them. However, if you find that you need to set a specific rdkafka setting that is not directly available from Vertica, you can directly pass options to the rdkafka library through the kafka_conf options.
The kafka_conf argument is supported when using a scheduler to load data from Kafka. You can set the values in the following ways (listed in order of lower to higher precedence):
-
The Linux environment variable
VERTICA_RDKAFKA_CONF
set on the host where you run the vkconfig utility. -
The Linux environment variable
VERTICA_RDKAFKA_CONF_
KAFKA_CLUSTER set on the host where you run the vkconfig utility. TheKAFKA_CLUSTER
portion of the variable name is the name of a Kafka cluster you have defined using vkconfig's cluster utility. The settings in this environment variable only affect the specific Kafka cluster you name inKAFKA_CLUSTER
. -
The
--kafka_conf
option of the vkconfig utility. This option can be set in the cluster, source, launch, and sync tools. Note that the setting only applies to each vkconfig utility call—it does not carry over to other vkconfig utility calls. For example, if you need to supply an option to the cluster and source tool, you must supply the kafka_conf option to both of them.
Note
Using an environment variable to set your rdkafka options helps to keep your settings consistent. It is easy to forget to set the--kafka_conf
option for each call to the vkconfig script.
All of these options cascade, so setting an option using the --kafka_conf
argument to the cluster tool overrides the same option that was set in the environment variables.
You can also directly set rdkafka options when directly calling KafkaExport, KafkaSource, and several other Kafka integration functions. These functions accept a parameter named kafka_conf
.
The kafka_conf option settings
The kafka_conf vkconfig option accepts a JSON object with settings in the following formats:
-
One or more option/value pairs:
--kafka_conf '{"option1":value1[, "option2":value2...]}'
-
A single option with multiple values:
--kafka_conf '{"option1":"value1[;value2...]"}'
See the rdkafka project on GitHub for a list of the configuration options supported by the rdkafka library.
Important
Arbitrarily setting options via kafka_conf can result in errors or unpredictable behavior. If you encounter a problem loading messages after setting an rdkafka option using the kafka_conf option, roll back your change to see if that was the source of the problem.To prevent confusion, never set options via the kafka_conf parameter that can be set directly through scheduler options. For example, do not use the kafka_conf option to set Kafka's message.max.bytes setting. Instead, use the load-spec tool's --message-max-bytes option.
Example
The following example demonstrates disabling rdkafka's api.version.request
option when manually loading messages using KafkaSource. You should always disable this option when accessing Kafka cluster running version 0.9 or earlier. See Configuring Vertica for Apache Kafka version 0.9 and earlier for more information.
=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
brokers='kafka-01.example.com:9092',
stop_on_eof=True,
kafka_conf='{"api.version.request":false}')
PARSER KafkaJSONParser();
Rows Loaded
-------------
5000
(1 row)
This example demonstrates setting two options with a JSON object when calling the cluster tool. It disables the api.version.request
option and enables CRC checks of messages from Kafka using the check.crcs
option:
$ vkconfig cluster --create --cluster StreamCluster1 \
--hosts kafka01.example.com:9092,kafka02.example.com:9092 \
--conf myscheduler.config \
--kafka_conf '{"api.version.request":false, "check.crcs":true}'
The following example demonstrates setting the same options using an environment variable:
$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false, "check.crcs":true}'
$ vkconfig cluster --create --cluster StreamCluster1 \
--hosts kafka01.example.com:9092,kafka02.example.com:9092 \
--conf myscheduler.config
Important
Setting thecheck.crc
option is just an example. Vertica does not suggest you enable the CRC check in your schedulers under normal circumstances. It adds additional overhead and can result in slower performance.
3 - Configuring Vertica for Apache Kafka version 0.9 and earlier
Apache Kafka version 0.10 introduced a new feature that allows consumers to determine which version of the Kafka API the Kafka brokers support. Consumers that support this feature send an initial API version query to the Kafka broker to determine the API version to use when communicating with them. Kafka brokers running version 0.9.0 or earlier cannot respond to the API query. If the consumer does not receive a reply from the Kafka broker within a timeout period (set to 10 seconds by default) the consumer can assume the Kafka broker is running Kafka version 0.9.0 or earlier.
The Vertica integration with Kafka supports this API query feature starting in version 9.1.1. This API check can cause problems if you are connecting Vertica to a Kafka cluster running 0.9.0 or earlier. You may notice poorer performance loading messages from Kafka and may experience errors as the 10 second API request timeout can cause parts of the Kafka integration feature to time out and report errors.
For example, if you run the vkconfig source utility to configure a source on a Kafka 0.9 cluster, you may get the following error:
$ vkconfig source --create --conf weblog.conf --cluster kafka_weblog --source web_hits
Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException:
ERROR: [[Vertica][VJDBC](5861) ERROR: Error calling processPartition() in
User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]]
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:184)
at com.vertica.solutions.kafka.model.StreamSource.setFromMapAndValidate(StreamSource.java:130)
at com.vertica.solutions.kafka.model.StreamModel.<init>(StreamModel.java:89)
at com.vertica.solutions.kafka.model.StreamSource.<init>(StreamSource.java:39)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:53)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:15)
at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:56)
at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:132)
at com.vertica.solutions.kafka.cli.SourceCLI.main(SourceCLI.java:25)
Caused by: java.sql.SQLNonTransientException: [Vertica][VJDBC](5861)
ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
. . .
Disabling the API version request when using the streaming job scheduler
To avoid these problems, you must disable the API version request feature in the rdkafka library that Vertica uses to communicate with Kafka. The easiest way to disable this setting when using the scheduler is to set a Linux environment variable on the host on which you run the vkconfig script. Using the environment variable ensures that each call to vkconfig includes the setting to disable the API version request. The vkconfig script checks two variables:
-
VERTICA_RDKAFKA_CONF
applies to all Kafka clusters that the scheduler running on the host communicates with. -
VERTICA_RDKAFKA_CONF_
CLUSTER_NAME
applies to just the Kafka cluster namedCLUSTER_NAME
. Use this variable if your scheduler communicates with several Kafka clusters, some of which are not running version 0.9 or earlier.
If you set both variables, the cluster-specific one takes precedence. This feature is useful if most of the clusters your scheduler connects to are running Kafka 0.9 or earlier, but a few run 0.10 or later. This case, you can disable the API version check for most clusters using the VERTICA_RDKAFKA_CONF
variable and re-enable the check for specific clusters using VERTICA_RDKAFKA_CONF_
CLUSTER_NAME
.
If just a few of your Kafka clusters run version 0.9 or earlier, you can just set the cluster-specific variables for them, and leave the default values in place for the majority of your clusters.
The contents of the environment variable is a JSON object that tells the rdkafka library whether to query the Kafka cluster for the API version it supports:
'{"api.version.request":false}'
To set the environment variable in BASH, use the export command:
$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false}'
If you wanted the setting to just affect a cluster named kafka_weblog, the command is:
$ export VERTICA_RDKAFKA_CONF_kafka_weblog='{"api.version.request":false}'
You can add the command to any of the common user environment configuration files such as ~/.bash_profile
or the system-wide files such as /etc/profile
. You must ensure that the same setting is used for all users who may run the vkconfig script, including any calls made by daemon processes such as init. You can also directly include this command in any script you use to set configure or start your scheduler.
Disabling the API version request when directly loading messages
You can disable the API request when you directly call KafkaSource to load messages by using the kafka_conf parameter:
=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
brokers='kafka-01.example.com:9092',
stop_on_eof=True,
kafka_conf='{"api.version.request":false}')
PARSER KafkaJSONParser();
Rows Loaded
-------------
5000
(1 row)
See also
4 - Changes to the message.max.bytes setting in Kafka version 0.11 and later
In Kafka version 0.10 and earlier, the message.max.bytes setting configured the maximum allowable size for an individual message. Starting in version 0.11, Kafka began grouping messages into batches. This version changed the meaning of this setting to be the largest allowable size of a message batch.
This change can have a significant impact on how fast your Vertica database loads messages from Kafka 0.11 and later. If you created your streaming job scheduler using Vertica version 9.1.0 or earlier, you might find that its load spec's message.max.bytes value is set too low. Upgrading your scheduler does not adjust this setting automatically.
To prevent slow throughput when using Vertica with Kafka 0.11 and later, manually adjust the message.max.bytes setting in scheduler's load spec. Testing by Vertica suggests the best value for this setting is 25165824 (24 × 1024 × 1024) bytes.
The following example demonstrates updating the message.max.bytes setting in the load spec named weblog_load of an existing scheduler that is defined using the configuration file named weblog.conf:
$ /opt/vertica/packages/kafka/bin/vkconfig load-spec --update \
--load-spec weblog_load --conf weblog.conf --message-max-bytes 25165824