Using kafkacat to troubleshoot Kafka integration issues

Kafkacat is a third-party open-source utility that lets you connect to Kafka from the Linux command line.

Kafkacat is a third-party open-source utility that lets you connect to Kafka from the Linux command line. It uses the same underlying library that the Vertica integration for Apache Kafka uses to connect to Kafka. This shared library makes kafkcat a useful tool for testing and debugging your Vertica integration with Kafka.

You may find kafkacat useful for:

  • Testing connectivity between your Vertica and Kafka clusters.

  • Examining Kafka data for anomalies that may prevent some of it from loading into Vertica.

  • Producing data for test loads into Vertica.

  • Listing details about a Kafka topic.

For more information about kafkacat, see its project page at Github.

Running kafkacat on Vertica nodes

The kafkacat utility is bundled in the Vertica install package, so it is available on all nodes of your Vertica cluster in the /opt/vertica/packages/kafka/bin directory. This is the same directory containing the vkconfig utility, so if you have added it to your path, you can use the kafkacat utility without specifying its full path. Otherwise, you can add this path to your shell's environment variable using the command:

set PATH=/opt/vertica/packages/kafka/bin:$PATH

Executing kafkacat without any arguments gives you a basic help message:

$ kafkacat
Error: -b <broker,..> missing

Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2015, Magnus Edenhill
Version releases/VER_8_1_RELEASE_BUILD_1_555_20170615-4931-g3fb918 (librdkafka releases/VER_8_1_RELEASE_BUILD_1_555_20170615-4931-g3fb918)


General options:
  -C | -P | -L       Mode: Consume, Produce or metadata List
  -G <group-id>      Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)
                     Expects a list of topics to subscribe to
  -t <topic>         Topic to consume from, produce to, or list
  -p <partition>     Partition
  -b <brokers,..>    Bootstrap broker(s) (host[:port])
  -D <delim>         Message delimiter character:
                     a-z.. | \r | \n | \t | \xNN
                     Default: \n
  -K <delim>         Key delimiter (same format as -D)
  -c <cnt>           Limit message count
  -X list            List available librdkafka configuration properties
  -X prop=val        Set librdkafka configuration property.
                     Properties prefixed with "topic." are
                     applied as topic properties.
  -X dump            Dump configuration and exit.
  -d <dbg1,...>      Enable librdkafka debugging:
                     all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature
  -q                 Be quiet (verbosity set to 0)
  -v                 Increase verbosity
  -V                 Print version

Producer options:
  -z snappy|gzip     Message compression. Default: none
  -p -1              Use random partitioner
  -D <delim>         Delimiter to split input into messages
  -K <delim>         Delimiter to split input key and message
  -l                 Send messages from a file separated by
                     delimiter, as with stdin.
                     (only one file allowed)
  -T                 Output sent messages to stdout, acting like tee.
  -c <cnt>           Exit after producing this number of messages
  -Z                 Send empty messages as NULL messages
  file1 file2..      Read messages from files.
                     With -l, only one file permitted.
                     Otherwise, the entire file contents will
                     be sent as one single message.

Consumer options:
  -o <offset>        Offset to start consuming from:
                     beginning | end | stored |
                     <value>  (absolute offset) |
                     -<value> (relative offset from end)
  -e                 Exit successfully when last message received
  -f <fmt..>         Output formatting string, see below.
                     Takes precedence over -D and -K.
  -D <delim>         Delimiter to separate messages on output
  -K <delim>         Print message keys prefixing the message
                     with specified delimiter.
  -O                 Print message offset using -K delimiter
  -c <cnt>           Exit after consuming this number of messages
  -Z                 Print NULL messages and keys as "NULL"(instead of empty)
  -u                 Unbuffered output

Metadata options:
  -t <topic>         Topic to query (optional)


Format string tokens:
  %s                 Message payload
  %S                 Message payload length (or -1 for NULL)
  %R                 Message payload length (or -1 for NULL) serialized
                     as a binary big endian 32-bit signed integer
  %k                 Message key
  %K                 Message key length (or -1 for NULL)
  %t                 Topic
  %p                 Partition
  %o                 Message offset
  \n \r \t           Newlines, tab
  \xXX \xNNN         Any ASCII character
 Example:
  -f 'Topic %t [%p] at offset %o: key %k: %s\n'


Consumer mode (writes messages to stdout):
  kafkacat -b <broker> -t <topic> -p <partition>
 or:
  kafkacat -C -b ...

High-level KafkaConsumer mode:
  kafkacat -b <broker> -G <group-id> topic1 top2 ^aregex\d+

Producer mode (reads messages from stdin):
  ... | kafkacat -b <broker> -t <topic> -p <partition>
 or:
  kafkacat -P -b ...

Metadata listing:
  kafkacat -L -b <broker> [-t <topic>]

Testing connectivity to a Kafka cluster and getting metadata

One basic troubleshooting step you often need to perform is verifying that Vertica nodes can connect to the Kafka cluster. Successfully executing just about any kafkacat command will prove the Vertica node you are logged into is able to reach the Kafka cluster. One simple command you can execute to verify connectivity is to get the metadata for all of the topics the Kafka cluster has defined. The following example demonstrates using kafkacat's metadata listing command to connect to the broker named kafka01 running on port 6667 (the Kafka broker port used by Hortonworks Hadoop clusters).

$ kafkacat -L -b kafka01:6667
Metadata for all topics (from broker -1: kafka01:6667/bootstrap):
 2 brokers:
  broker 1001 at kafka03.example.com:6667
  broker 1002 at kafka01.example.com:6667
 4 topics:
  topic "iot-data" with 3 partitions:
    partition 2, leader 1002, replicas: 1002, isrs: 1002
    partition 1, leader 1001, replicas: 1001, isrs: 1001
    partition 0, leader 1002, replicas: 1002, isrs: 1002
  topic "__consumer_offsets" with 50 partitions:
    partition 23, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 41, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 32, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 8, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 17, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 44, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 35, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 26, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 11, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 29, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 38, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 47, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 20, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 2, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 5, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 14, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 46, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 49, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 40, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 4, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 13, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 22, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 31, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 16, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 7, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 43, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 25, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 34, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 10, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 37, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 1, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 19, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 28, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 45, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 36, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 27, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 9, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 18, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 21, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 48, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 12, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 3, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 30, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 39, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 15, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 42, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 24, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 33, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 6, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 0, leader 1002, replicas: 1002,1001, isrs: 1001,1002
  topic "web_hits" with 1 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
  topic "ambari_kafka_service_check" with 1 partitions:
    partition 0, leader 1002, replicas: 1002, isrs: 1002

You can also use this output to verify the topics defined by your Kafka cluster, as well as the number of partitions each topic defines. You need this information when copying data between Kafka and Vertica.

Retrieving messages from a Kafka topic

When you are troubleshooting issues with streaming messages from Kafka in Vertica, you often want to look at the raw data that Kafka sent. For example, you may want to verify that the messages are in the format that your expect. Or, you may want to review specific messages to see if some of them weren't in the right format for Vertica to parse. You can use kafkacat to read messages from a topic using its consume command (-C). At the very least, you must pass kafkacat the brokers (-b argument) and the topic you want to read from (-t). You can also choose to read messages from a specific offset (-o) and partition (-p). You will usually also want kafkacat to exit after completing the data read (-e) instead continuing to wait for more messages.

This example gets the last message in the topic named web_hits. The offset argument uses a negative value, which tells kafkacat to read from the end of the topic.

$ kafkacat -C -b kafka01:6667 -t web_hits -o -1 -e
{"url": "wp-content/list/search.php", "ip": "132.74.240.52",
"date": "2018/03/28 14:12:34",
"user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 4_2 like
Mac OS X; sl-SI) AppleWebKit/532.22.4 (KHTML, like Gecko) Version/3.0.5
Mobile/8B117 Safari/6532.22.4"}
% Reached end of topic web_hits [0] at offset 54932: exiting

You can also read a specific range of messages by specifying an offset and a limit (-c argument). For example, you may want to look at a specific range of data to determine why Vertica could not load it. The following example reads 10 messages from the topic iot-data starting at offset 3280:

$ kafkacat -C -b kafka01:6667 -t iot-data -o 3280 -c 10 -e
63680, 19, 24.439323, 26.0128725
43510, 71, 162.319805, -37.4924025
91113, 53, 139.764857, -74.735731
88508, 14, -85.821967, -7.236280
20419, 31, -129.583988, 13.995481
79365, 79, 153.184594, 51.1583485
26283, 25, -168.911020, 35.331027
32111, 56, -157.930451, 82.2676385
56808, 17, 19.603286, -0.7698495
9118, 73, 97.365445, -74.8593245

Generating data for a Kafka topic

If you are preparing to stream data from a Kafka topic that is not yet active, you may want a way to stream test messages. You can then verify that the topic's messages load into Vertica without worrying that you will miss actual data.

To send data to Kafka, use kafkacat's produce command (-P). The easiest way to supply it with messages is to pipe them in via STDIN, one message per line. You can choose a specific partition for the data, or have kafkacat randomly assign each message to a random partition by setting the partition number to -1. For example, suppose you have a file named iot-data.csv that you wanted to produce to random partitions of a Kafka topic named iot-data. Then you could use the following command:

$ cat iot_data.csv | kafkacat -P -p -1 -b kafka01:6667 -t iot-data