使用 kafkacat 对 Kafka 集成问题进行故障排除

Kafkacat 是第三方开源实用程序,可用于从 Linux 命令行连接到 Kafka。它使用 Vertica 与 Apache Kafka 的集成用于连接到 Kafka 的相同底层库。这一共享库使得 kafkcat 成为一款测试和调试 Vertica 与 Kafka 的集成的有用工具。

在以下方面,您可能会发现 kafkacat 非常有用:

  • 测试 Vertica 与 Kafka 群集之间的连接。

  • 检查 Kafka 数据是否存在可能会阻止其中某些数据加载到 Vertica 中的异常。

  • 生成数据以供测试加载到 Vertica 中。

  • 列出有关 Kafka 主题的详细信息。

有关 kafkacat 的详细信息,请参阅其在 Github 上的项目页面

在 Vertica 节点上运行 kafkacat

Kafkacat 实用程序已捆绑到 Vertica 安装包中,因此可以在 Vertica 群集的所有节点的 /opt/vertica/packages/kafka/bin 目录中找到它。这是包含 vkconfig 实用程序的同一目录,因此如果已将它添加到路径中,则无需指定其完整路径即可使用 kafkacat 实用程序。否则,可以使用以下命令将此路径添加到 shell 的环境变量中:

set PATH=/opt/vertica/packages/kafka/bin:$PATH

执行不含任何实参的 kafkacat 会为您显示基本的帮助消息:

$ kafkacat
Error: -b <broker,..> missing

Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2015, Magnus Edenhill
Version releases/VER_8_1_RELEASE_BUILD_1_555_20170615-4931-g3fb918 (librdkafka releases/VER_8_1_RELEASE_BUILD_1_555_20170615-4931-g3fb918)


General options:
  -C | -P | -L       Mode: Consume, Produce or metadata List
  -G <group-id>      Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)
                     Expects a list of topics to subscribe to
  -t <topic>         Topic to consume from, produce to, or list
  -p <partition>     Partition
  -b <brokers,..>    Bootstrap broker(s) (host[:port])
  -D <delim>         Message delimiter character:
                     a-z.. | \r | \n | \t | \xNN
                     Default: \n
  -K <delim>         Key delimiter (same format as -D)
  -c <cnt>           Limit message count
  -X list            List available librdkafka configuration properties
  -X prop=val        Set librdkafka configuration property.
                     Properties prefixed with "topic." are
                     applied as topic properties.
  -X dump            Dump configuration and exit.
  -d <dbg1,...>      Enable librdkafka debugging:
                     all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature
  -q                 Be quiet (verbosity set to 0)
  -v                 Increase verbosity
  -V                 Print version

Producer options:
  -z snappy|gzip     Message compression. Default: none
  -p -1              Use random partitioner
  -D <delim>         Delimiter to split input into messages
  -K <delim>         Delimiter to split input key and message
  -l                 Send messages from a file separated by
                     delimiter, as with stdin.
                     (only one file allowed)
  -T                 Output sent messages to stdout, acting like tee.
  -c <cnt>           Exit after producing this number of messages
  -Z                 Send empty messages as NULL messages
  file1 file2..      Read messages from files.
                     With -l, only one file permitted.
                     Otherwise, the entire file contents will
                     be sent as one single message.

Consumer options:
  -o <offset>        Offset to start consuming from:
                     beginning | end | stored |
                     <value>  (absolute offset) |
                     -<value> (relative offset from end)
  -e                 Exit successfully when last message received
  -f <fmt..>         Output formatting string, see below.
                     Takes precedence over -D and -K.
  -D <delim>         Delimiter to separate messages on output
  -K <delim>         Print message keys prefixing the message
                     with specified delimiter.
  -O                 Print message offset using -K delimiter
  -c <cnt>           Exit after consuming this number of messages
  -Z                 Print NULL messages and keys as "NULL"(instead of empty)
  -u                 Unbuffered output

Metadata options:
  -t <topic>         Topic to query (optional)


Format string tokens:
  %s                 Message payload
  %S                 Message payload length (or -1 for NULL)
  %R                 Message payload length (or -1 for NULL) serialized
                     as a binary big endian 32-bit signed integer
  %k                 Message key
  %K                 Message key length (or -1 for NULL)
  %t                 Topic
  %p                 Partition
  %o                 Message offset
  \n \r \t           Newlines, tab
  \xXX \xNNN         Any ASCII character
 Example:
  -f 'Topic %t [%p] at offset %o: key %k: %s\n'


Consumer mode (writes messages to stdout):
  kafkacat -b <broker> -t <topic> -p <partition>
 or:
  kafkacat -C -b ...

High-level KafkaConsumer mode:
  kafkacat -b <broker> -G <group-id> topic1 top2 ^aregex\d+

Producer mode (reads messages from stdin):
  ... | kafkacat -b <broker> -t <topic> -p <partition>
 or:
  kafkacat -P -b ...

Metadata listing:
  kafkacat -L -b <broker> [-t <topic>]

测试与 Kafka 群集的连接并获取元数据

验证 Vertica 节点是否可以连接到 Kafka 群集是经常需要执行的一项基本故障排除步骤。成功执行几乎任何 kafkacat 命令证明您登录的 Vertica 节点能够访问 Kafka 群集。验证连接时,可以执行的一个简单命令是获取 Kafka 群集已定义的所有主题的元数据。以下示例演示了如何使用 kafkacat 的元数据列出命令连接到在端口 6667(由 Hortonworks Hadoop 群集使用的 Kafka 代理端口)上运行的名为 kafka01 的代理。

$ kafkacat -L -b kafka01:6667
Metadata for all topics (from broker -1: kafka01:6667/bootstrap):
 2 brokers:
  broker 1001 at kafka03.example.com:6667
  broker 1002 at kafka01.example.com:6667
 4 topics:
  topic "iot-data" with 3 partitions:
    partition 2, leader 1002, replicas: 1002, isrs: 1002
    partition 1, leader 1001, replicas: 1001, isrs: 1001
    partition 0, leader 1002, replicas: 1002, isrs: 1002
  topic "__consumer_offsets" with 50 partitions:
    partition 23, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 41, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 32, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 8, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 17, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 44, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 35, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 26, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 11, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 29, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 38, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 47, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 20, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 2, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 5, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 14, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 46, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 49, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 40, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 4, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 13, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 22, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 31, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 16, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 7, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 43, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 25, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 34, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 10, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 37, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 1, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 19, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 28, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 45, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 36, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 27, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 9, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 18, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 21, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 48, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 12, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 3, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 30, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 39, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 15, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 42, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 24, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 33, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 6, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 0, leader 1002, replicas: 1002,1001, isrs: 1001,1002
  topic "web_hits" with 1 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
  topic "ambari_kafka_service_check" with 1 partitions:
    partition 0, leader 1002, replicas: 1002, isrs: 1002

此外,还可以使用此输出来验证由 Kafka 群集定义的主题,以及每个主题定义的分区数。在 Kafka 与 Vertica 之间复制数据时需要提供此信息。

从 Kafka 主题中检索消息

在 Vertica 中对从 Kafka 流式传输消息相关的问题进行故障排除时,您通常希望查看 Kafka 已发送的原始数据。例如,您可能希望验证消息是否采用预期格式。或者,您可能希望查看特定消息,以确认其中一些消息是否未采用 Vertica 能够解析的正确格式。您可以使用 kafkacat 通过其使用命令 (-C) 从主题中读取消息。至少必须向 kafkacat 传递理(-b 实参)以及要从其读取消息的主题 (-t)。此外,还可以选择从特定偏移量 (-o) 和分区 (-p) 读取消息。您通常还希望 kafkacat 在完成数据读取后退出 (-e),而不是继续等待读取更多消息。

以下示例将获取名为 web_hits 的主题中的最后一条消息。offset 实参将使用负值,从而告知 kafkacat 从主题的结尾处读取消息。

$ kafkacat -C -b kafka01:6667 -t web_hits -o -1 -e
{"url": "wp-content/list/search.php", "ip": "132.74.240.52",
"date": "2018/03/28 14:12:34",
"user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 4_2 like
Mac OS X; sl-SI) AppleWebKit/532.22.4 (KHTML, like Gecko) Version/3.0.5
Mobile/8B117 Safari/6532.22.4"}
% Reached end of topic web_hits [0] at offset 54932: exiting

还可以通过指定偏移量和限值(-c 实参)来读取特定范围的消息。例如,您可能希望查看特定范围的数据以确定 Vertica 无法加载数据的原因。以下示例将从偏移量 3280 处开始读取主题 iot-data 中的 10 条消息:

$ kafkacat -C -b kafka01:6667 -t iot-data -o 3280 -c 10 -e
63680, 19, 24.439323, 26.0128725
43510, 71, 162.319805, -37.4924025
91113, 53, 139.764857, -74.735731
88508, 14, -85.821967, -7.236280
20419, 31, -129.583988, 13.995481
79365, 79, 153.184594, 51.1583485
26283, 25, -168.911020, 35.331027
32111, 56, -157.930451, 82.2676385
56808, 17, 19.603286, -0.7698495
9118, 73, 97.365445, -74.8593245

为 Kafka 主题生成数据

如果准备从尚未处于活动状态的 Kafka 主题流式传输数据,可能需要一种流式传输测试消息的方法。然后,可以验证该主题的消息是否已加载到 Vertica 中,而不必担心会丢失实际数据。

要将数据发送到 Kafka,请使用 kafkacat 的生产命令 (-P)。向它提供消息的最简单方法是通过 STDIN 使用管道传输消息,每行一条消息。您可以为数据选择一个特定的分区,或者通过将分区编号设置为 -1 让 kafkacat 将每条消息随机分配到一个随机分区。例如,假设您有一个名为 iot-data.csv 的文件,并且希望将其生成到名为 iot-data 的 Kafka 主题的随机分区中。那么,您可以使用以下命令:

$ cat iot_data.csv | kafkacat -P -p -1 -b kafka01:6667 -t iot-data