The following topics can help you troubleshoot issues integrating Vertica with Apache Kafka.
This is the multi-page printable view of this section. Click here to print.
Troubleshooting Kafka integration issues
- 1: Using kafkacat to troubleshoot Kafka integration issues
- 2: Troubleshooting slow load speeds
- 3: Troubleshooting missing messages
1 - Using kafkacat to troubleshoot Kafka integration issues
Kafkacat is a third-party open-source utility that lets you connect to Kafka from the Linux command line. It uses the same underlying library that the Vertica integration for Apache Kafka uses to connect to Kafka. This shared library makes kafkcat a useful tool for testing and debugging your Vertica integration with Kafka.
You may find kafkacat useful for:
-
Testing connectivity between your Vertica and Kafka clusters.
-
Examining Kafka data for anomalies that may prevent some of it from loading into Vertica.
-
Producing data for test loads into Vertica.
-
Listing details about a Kafka topic.
For more information about kafkacat, see its project page at Github.
Running kafkacat on Vertica nodes
The kafkacat utility is bundled in the Vertica install package, so it is available on all nodes of your Vertica cluster in the /opt/vertica/packages/kafka/bin
directory. This is the same directory containing the vkconfig utility, so if you have added it to your path, you can use the kafkacat utility without specifying its full path. Otherwise, you can add this path to your shell's environment variable using the command:
set PATH=/opt/vertica/packages/kafka/bin:$PATH
Note
On Debian and Ubuntu systems, you must tell kafkacat to use Vertica's own copy of the SSL libraries by setting the LD_LIBRARY_PATH environment variable:
$ export LD_LIBRARY_PATH=/opt/vertica/lib
If you do not set this environment variable, the kafkcat utility exits with the error:
kafkacat: error while loading shared libraries: libcrypto.so.10:
cannot open shared object file: No such file or directory
Executing kafkacat without any arguments gives you a basic help message:
$ kafkacat
Error: -b <broker,..> missing
Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2015, Magnus Edenhill
Version releases/VER_8_1_RELEASE_BUILD_1_555_20170615-4931-g3fb918 (librdkafka releases/VER_8_1_RELEASE_BUILD_1_555_20170615-4931-g3fb918)
General options:
-C | -P | -L Mode: Consume, Produce or metadata List
-G <group-id> Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)
Expects a list of topics to subscribe to
-t <topic> Topic to consume from, produce to, or list
-p <partition> Partition
-b <brokers,..> Bootstrap broker(s) (host[:port])
-D <delim> Message delimiter character:
a-z.. | \r | \n | \t | \xNN
Default: \n
-K <delim> Key delimiter (same format as -D)
-c <cnt> Limit message count
-X list List available librdkafka configuration properties
-X prop=val Set librdkafka configuration property.
Properties prefixed with "topic." are
applied as topic properties.
-X dump Dump configuration and exit.
-d <dbg1,...> Enable librdkafka debugging:
all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature
-q Be quiet (verbosity set to 0)
-v Increase verbosity
-V Print version
Producer options:
-z snappy|gzip Message compression. Default: none
-p -1 Use random partitioner
-D <delim> Delimiter to split input into messages
-K <delim> Delimiter to split input key and message
-l Send messages from a file separated by
delimiter, as with stdin.
(only one file allowed)
-T Output sent messages to stdout, acting like tee.
-c <cnt> Exit after producing this number of messages
-Z Send empty messages as NULL messages
file1 file2.. Read messages from files.
With -l, only one file permitted.
Otherwise, the entire file contents will
be sent as one single message.
Consumer options:
-o <offset> Offset to start consuming from:
beginning | end | stored |
<value> (absolute offset) |
-<value> (relative offset from end)
-e Exit successfully when last message received
-f <fmt..> Output formatting string, see below.
Takes precedence over -D and -K.
-D <delim> Delimiter to separate messages on output
-K <delim> Print message keys prefixing the message
with specified delimiter.
-O Print message offset using -K delimiter
-c <cnt> Exit after consuming this number of messages
-Z Print NULL messages and keys as "NULL"(instead of empty)
-u Unbuffered output
Metadata options:
-t <topic> Topic to query (optional)
Format string tokens:
%s Message payload
%S Message payload length (or -1 for NULL)
%R Message payload length (or -1 for NULL) serialized
as a binary big endian 32-bit signed integer
%k Message key
%K Message key length (or -1 for NULL)
%t Topic
%p Partition
%o Message offset
\n \r \t Newlines, tab
\xXX \xNNN Any ASCII character
Example:
-f 'Topic %t [%p] at offset %o: key %k: %s\n'
Consumer mode (writes messages to stdout):
kafkacat -b <broker> -t <topic> -p <partition>
or:
kafkacat -C -b ...
High-level KafkaConsumer mode:
kafkacat -b <broker> -G <group-id> topic1 top2 ^aregex\d+
Producer mode (reads messages from stdin):
... | kafkacat -b <broker> -t <topic> -p <partition>
or:
kafkacat -P -b ...
Metadata listing:
kafkacat -L -b <broker> [-t <topic>]
Testing connectivity to a Kafka cluster and getting metadata
One basic troubleshooting step you often need to perform is verifying that Vertica nodes can connect to the Kafka cluster. Successfully executing just about any kafkacat command will prove the Vertica node you are logged into is able to reach the Kafka cluster. One simple command you can execute to verify connectivity is to get the metadata for all of the topics the Kafka cluster has defined. The following example demonstrates using kafkacat's metadata listing command to connect to the broker named kafka01 running on port 6667 (the Kafka broker port used by Hortonworks Hadoop clusters).
$ kafkacat -L -b kafka01:6667
Metadata for all topics (from broker -1: kafka01:6667/bootstrap):
2 brokers:
broker 1001 at kafka03.example.com:6667
broker 1002 at kafka01.example.com:6667
4 topics:
topic "iot-data" with 3 partitions:
partition 2, leader 1002, replicas: 1002, isrs: 1002
partition 1, leader 1001, replicas: 1001, isrs: 1001
partition 0, leader 1002, replicas: 1002, isrs: 1002
topic "__consumer_offsets" with 50 partitions:
partition 23, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 41, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 32, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 8, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 17, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 44, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 35, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 26, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 11, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 29, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 38, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 47, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 20, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 2, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 5, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 14, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 46, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 49, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 40, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 4, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 13, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 22, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 31, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 16, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 7, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 43, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 25, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 34, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 10, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 37, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 1, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 19, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 28, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 45, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 36, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 27, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 9, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 18, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 21, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 48, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 12, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 3, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 30, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 39, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 15, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 42, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 24, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 33, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 6, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 0, leader 1002, replicas: 1002,1001, isrs: 1001,1002
topic "web_hits" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
topic "ambari_kafka_service_check" with 1 partitions:
partition 0, leader 1002, replicas: 1002, isrs: 1002
You can also use this output to verify the topics defined by your Kafka cluster, as well as the number of partitions each topic defines. You need this information when copying data between Kafka and Vertica.
Retrieving messages from a Kafka topic
When you are troubleshooting issues with streaming messages from Kafka in Vertica, you often want to look at the raw data that Kafka sent. For example, you may want to verify that the messages are in the format that your expect. Or, you may want to review specific messages to see if some of them weren't in the right format for Vertica to parse. You can use kafkacat to read messages from a topic using its consume command (-C
). At the very least, you must pass kafkacat the brokers (-b
argument) and the topic you want to read from (-t
). You can also choose to read messages from a specific offset (-o
) and partition (-p
). You will usually also want kafkacat to exit after completing the data read (-e
) instead continuing to wait for more messages.
This example gets the last message in the topic named web_hits. The offset argument uses a negative value, which tells kafkacat to read from the end of the topic.
$ kafkacat -C -b kafka01:6667 -t web_hits -o -1 -e
{"url": "wp-content/list/search.php", "ip": "132.74.240.52",
"date": "2018/03/28 14:12:34",
"user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 4_2 like
Mac OS X; sl-SI) AppleWebKit/532.22.4 (KHTML, like Gecko) Version/3.0.5
Mobile/8B117 Safari/6532.22.4"}
% Reached end of topic web_hits [0] at offset 54932: exiting
You can also read a specific range of messages by specifying an offset and a limit (-c
argument). For example, you may want to look at a specific range of data to determine why Vertica could not load it. The following example reads 10 messages from the topic iot-data starting at offset 3280:
$ kafkacat -C -b kafka01:6667 -t iot-data -o 3280 -c 10 -e
63680, 19, 24.439323, 26.0128725
43510, 71, 162.319805, -37.4924025
91113, 53, 139.764857, -74.735731
88508, 14, -85.821967, -7.236280
20419, 31, -129.583988, 13.995481
79365, 79, 153.184594, 51.1583485
26283, 25, -168.911020, 35.331027
32111, 56, -157.930451, 82.2676385
56808, 17, 19.603286, -0.7698495
9118, 73, 97.365445, -74.8593245
Generating data for a Kafka topic
If you are preparing to stream data from a Kafka topic that is not yet active, you may want a way to stream test messages. You can then verify that the topic's messages load into Vertica without worrying that you will miss actual data.
To send data to Kafka, use kafkacat's produce command (-P). The easiest way to supply it with messages is to pipe them in via STDIN, one message per line. You can choose a specific partition for the data, or have kafkacat randomly assign each message to a random partition by setting the partition number to -1. For example, suppose you have a file named iot-data.csv that you wanted to produce to random partitions of a Kafka topic named iot-data. Then you could use the following command:
$ cat iot_data.csv | kafkacat -P -p -1 -b kafka01:6667 -t iot-data
2 - Troubleshooting slow load speeds
Here are some potential issues that could cause messages to load slowly from Kafka.
Verify you have disabled the API version check when communicating with Kafka 0.9 or earlier
If your Kafka cluster is running 0.9 or earlier, be sure you have disable the rdkafka library's api.version.request
option. If you do not, every Vertica connection to Kafka will pause for 10 seconds until the API version request times out. Depending on the frame size of your load or other timeout settings, this delay can either reduce the throughput of your data loads. It can even totally prevent messages from being loaded. See Configuring Vertica for Apache Kafka version 0.9 and earlier for more information.
Eon Mode and cloud latency
Eon Mode separates compute from storage in a Vertica cluster, which can cause a small amount of latency when Vertica loads and saves data. Cloud computing infrastructure can also cause latency. This latency can eat into the frame duration for your schedulers, causing them to load less data in each frame. For this reason, you should consider increasing frame durations when loading data from Kafka in an Eon Mode database. See Vertica Eon Mode and Kafka for more information.
3 - Troubleshooting missing messages
Kafka producers that emit a high data volume might overwhelm Vertica, possibly resulting in messages expiring in Kafka before the scheduler loads them into Vertica. This is more common when Vertica performs additional processing on the loaded messages, such as text indexing.
If you see that you are missing messages from a topic with multiple partitions, consider configuring the --max-parallelism
microbatch utility option. The --max-parallelism
option splits a microbatch into multiple subset microbatches. This enables you to use PLANNEDCONCURRENCY available in the scheduler's resource pool to create more scheduler threads for simultaneous loads of a single microbatch. Each node uses the resource pool EXECUTIONPARALLELISM setting to determine the number of threads created to process partitions. Because EXECUTIONPARALLELISM threads are created per scheduler thread, using more PLANNEDCONCURRENCY per microbatch enables you to process more partitions in parallel for a single unit of work.
For details, see Managing scheduler resources and performance.