Configuring Vertica for Apache Kafka version 0.9 and earlier

Apache Kafka version 0.10 introduced a new feature that allows consumers to determine which version of the Kafka API the Kafka brokers support.

Apache Kafka version 0.10 introduced a new feature that allows consumers to determine which version of the Kafka API the Kafka brokers support. Consumers that support this feature send an initial API version query to the Kafka broker to determine the API version to use when communicating with them. Kafka brokers running version 0.9.0 or earlier cannot respond to the API query. If the consumer does not receive a reply from the Kafka broker within a timeout period (set to 10 seconds by default) the consumer can assume the Kafka broker is running Kafka version 0.9.0 or earlier.

The Vertica integration with Kafka supports this API query feature starting in version 9.1.1. This API check can cause problems if you are connecting Vertica to a Kafka cluster running 0.9.0 or earlier. You may notice poorer performance loading messages from Kafka and may experience errors as the 10 second API request timeout can cause parts of the Kafka integration feature to time out and report errors.

For example, if you run the vkconfig source utility to configure a source on a Kafka 0.9 cluster, you may get the following error:

$ vkconfig source --create --conf weblog.conf --cluster kafka_weblog --source web_hits
Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException:
ERROR: [[Vertica][VJDBC](5861) ERROR: Error calling processPartition() in
User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]]
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:184)
at com.vertica.solutions.kafka.model.StreamSource.setFromMapAndValidate(StreamSource.java:130)
at com.vertica.solutions.kafka.model.StreamModel.<init>(StreamModel.java:89)
at com.vertica.solutions.kafka.model.StreamSource.<init>(StreamSource.java:39)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:53)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:15)
at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:56)
at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:132)
at com.vertica.solutions.kafka.cli.SourceCLI.main(SourceCLI.java:25)
Caused by: java.sql.SQLNonTransientException: [Vertica][VJDBC](5861)
ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
                           . . .

Disabling the API version request when using the streaming job scheduler

To avoid these problems, you must disable the API version request feature in the rdkafka library that Vertica uses to communicate with Kafka. The easiest way to disable this setting when using the scheduler is to set a Linux environment variable on the host on which you run the vkconfig script. Using the environment variable ensures that each call to vkconfig includes the setting to disable the API version request. The vkconfig script checks two variables:

  • VERTICA_RDKAFKA_CONF applies to all Kafka clusters that the scheduler running on the host communicates with.

  • VERTICA_RDKAFKA_CONF_CLUSTER_NAME applies to just the Kafka cluster named CLUSTER_NAME. Use this variable if your scheduler communicates with several Kafka clusters, some of which are not running version 0.9 or earlier.

If you set both variables, the cluster-specific one takes precedence. This feature is useful if most of the clusters your scheduler connects to are running Kafka 0.9 or earlier, but a few run 0.10 or later. This case, you can disable the API version check for most clusters using the VERTICA_RDKAFKA_CONF variable and re-enable the check for specific clusters using VERTICA_RDKAFKA_CONF_CLUSTER_NAME.

If just a few of your Kafka clusters run version 0.9 or earlier, you can just set the cluster-specific variables for them, and leave the default values in place for the majority of your clusters.

The contents of the environment variable is a JSON object that tells the rdkafka library whether to query the Kafka cluster for the API version it supports:

'{"api.version.request":false}'

To set the environment variable in BASH, use the export command:

$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false}'

If you wanted the setting to just affect a cluster named kafka_weblog, the command is:

$ export VERTICA_RDKAFKA_CONF_kafka_weblog='{"api.version.request":false}'

You can add the command to any of the common user environment configuration files such as ~/.bash_profile or the system-wide files such as /etc/profile. You must ensure that the same setting is used for all users who may run the vkconfig script, including any calls made by daemon processes such as init. You can also directly include this command in any script you use to set configure or start your scheduler.

Disabling the API version request when directly loading messages

You can disable the API request when you directly call KafkaSource to load messages by using the kafka_conf parameter:

=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
                                           brokers='kafka-01.example.com:9092',
                                           stop_on_eof=True,
                                           kafka_conf='{"api.version.request":false}')
        PARSER KafkaJSONParser();
 Rows Loaded
-------------
        5000
(1 row)

See also