使用 kafkacat 对 Kafka 集成问题进行故障排除
Kafkacat 是第三方开源实用程序,可用于从 Linux 命令行连接到 Kafka。它使用 Vertica 与 Apache Kafka 的集成用于连接到 Kafka 的相同底层库。这一共享库使得 kafkcat 成为一款测试和调试 Vertica 与 Kafka 的集成的有用工具。
在以下方面,您可能会发现 kafkacat 非常有用:
-
测试 Vertica 与 Kafka 群集之间的连接。
-
检查 Kafka 数据是否存在可能会阻止其中某些数据加载到 Vertica 中的异常。
-
生成数据以供测试加载到 Vertica 中。
-
列出有关 Kafka 主题的详细信息。
有关 kafkacat 的详细信息,请参阅其在 Github 上的项目页面。
在 Vertica 节点上运行 kafkacat
Kafkacat 实用程序已捆绑到 Vertica 安装包中,因此可以在 Vertica 群集的所有节点的 /opt/vertica/packages/kafka/bin
目录中找到它。这是包含 vkconfig 实用程序的同一目录,因此如果已将它添加到路径中,则无需指定其完整路径即可使用 kafkacat 实用程序。否则,可以使用以下命令将此路径添加到 shell 的环境变量中:
set PATH=/opt/vertica/packages/kafka/bin:$PATH
注意
在 Debian 和 Ubuntu 系统上,必须通过设置 LD_LIBRARY_PATH 环境变量告知 kafkacat 使用 Vertica 自己的 SSL 库副本:
$ export LD_LIBRARY_PATH=/opt/vertica/lib
如果不设置此环境变量,kafkcat 实用程序会退出并显示以下错误:
kafkacat: error while loading shared libraries: libcrypto.so.10:
cannot open shared object file: No such file or directory
执行不含任何实参的 kafkacat 会为您显示基本的帮助消息:
$ kafkacat
Error: -b <broker,..> missing
Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2015, Magnus Edenhill
Version releases/VER_8_1_RELEASE_BUILD_1_555_20170615-4931-g3fb918 (librdkafka releases/VER_8_1_RELEASE_BUILD_1_555_20170615-4931-g3fb918)
General options:
-C | -P | -L Mode: Consume, Produce or metadata List
-G <group-id> Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)
Expects a list of topics to subscribe to
-t <topic> Topic to consume from, produce to, or list
-p <partition> Partition
-b <brokers,..> Bootstrap broker(s) (host[:port])
-D <delim> Message delimiter character:
a-z.. | \r | \n | \t | \xNN
Default: \n
-K <delim> Key delimiter (same format as -D)
-c <cnt> Limit message count
-X list List available librdkafka configuration properties
-X prop=val Set librdkafka configuration property.
Properties prefixed with "topic." are
applied as topic properties.
-X dump Dump configuration and exit.
-d <dbg1,...> Enable librdkafka debugging:
all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature
-q Be quiet (verbosity set to 0)
-v Increase verbosity
-V Print version
Producer options:
-z snappy|gzip Message compression. Default: none
-p -1 Use random partitioner
-D <delim> Delimiter to split input into messages
-K <delim> Delimiter to split input key and message
-l Send messages from a file separated by
delimiter, as with stdin.
(only one file allowed)
-T Output sent messages to stdout, acting like tee.
-c <cnt> Exit after producing this number of messages
-Z Send empty messages as NULL messages
file1 file2.. Read messages from files.
With -l, only one file permitted.
Otherwise, the entire file contents will
be sent as one single message.
Consumer options:
-o <offset> Offset to start consuming from:
beginning | end | stored |
<value> (absolute offset) |
-<value> (relative offset from end)
-e Exit successfully when last message received
-f <fmt..> Output formatting string, see below.
Takes precedence over -D and -K.
-D <delim> Delimiter to separate messages on output
-K <delim> Print message keys prefixing the message
with specified delimiter.
-O Print message offset using -K delimiter
-c <cnt> Exit after consuming this number of messages
-Z Print NULL messages and keys as "NULL"(instead of empty)
-u Unbuffered output
Metadata options:
-t <topic> Topic to query (optional)
Format string tokens:
%s Message payload
%S Message payload length (or -1 for NULL)
%R Message payload length (or -1 for NULL) serialized
as a binary big endian 32-bit signed integer
%k Message key
%K Message key length (or -1 for NULL)
%t Topic
%p Partition
%o Message offset
\n \r \t Newlines, tab
\xXX \xNNN Any ASCII character
Example:
-f 'Topic %t [%p] at offset %o: key %k: %s\n'
Consumer mode (writes messages to stdout):
kafkacat -b <broker> -t <topic> -p <partition>
or:
kafkacat -C -b ...
High-level KafkaConsumer mode:
kafkacat -b <broker> -G <group-id> topic1 top2 ^aregex\d+
Producer mode (reads messages from stdin):
... | kafkacat -b <broker> -t <topic> -p <partition>
or:
kafkacat -P -b ...
Metadata listing:
kafkacat -L -b <broker> [-t <topic>]
测试与 Kafka 群集的连接并获取元数据
验证 Vertica 节点是否可以连接到 Kafka 群集是经常需要执行的一项基本故障排除步骤。成功执行几乎任何 kafkacat 命令证明您登录的 Vertica 节点能够访问 Kafka 群集。验证连接时,可以执行的一个简单命令是获取 Kafka 群集已定义的所有主题的元数据。以下示例演示了如何使用 kafkacat 的元数据列出命令连接到在端口 6667(由 Hortonworks Hadoop 群集使用的 Kafka 代理端口)上运行的名为 kafka01 的代理。
$ kafkacat -L -b kafka01:6667
Metadata for all topics (from broker -1: kafka01:6667/bootstrap):
2 brokers:
broker 1001 at kafka03.example.com:6667
broker 1002 at kafka01.example.com:6667
4 topics:
topic "iot-data" with 3 partitions:
partition 2, leader 1002, replicas: 1002, isrs: 1002
partition 1, leader 1001, replicas: 1001, isrs: 1001
partition 0, leader 1002, replicas: 1002, isrs: 1002
topic "__consumer_offsets" with 50 partitions:
partition 23, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 41, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 32, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 8, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 17, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 44, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 35, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 26, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 11, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 29, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 38, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 47, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 20, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 2, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 5, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 14, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 46, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 49, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 40, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 4, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 13, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 22, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 31, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 16, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 7, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 43, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 25, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 34, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 10, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 37, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 1, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 19, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 28, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 45, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 36, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 27, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 9, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 18, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 21, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 48, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 12, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 3, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 30, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 39, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 15, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 42, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 24, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 33, leader 1001, replicas: 1002,1001, isrs: 1001,1002
partition 6, leader 1002, replicas: 1002,1001, isrs: 1001,1002
partition 0, leader 1002, replicas: 1002,1001, isrs: 1001,1002
topic "web_hits" with 1 partitions:
partition 0, leader 1001, replicas: 1001, isrs: 1001
topic "ambari_kafka_service_check" with 1 partitions:
partition 0, leader 1002, replicas: 1002, isrs: 1002
此外,还可以使用此输出来验证由 Kafka 群集定义的主题,以及每个主题定义的分区数。在 Kafka 与 Vertica 之间复制数据时需要提供此信息。
从 Kafka 主题中检索消息
在 Vertica 中对从 Kafka 流式传输消息相关的问题进行故障排除时,您通常希望查看 Kafka 已发送的原始数据。例如,您可能希望验证消息是否采用预期格式。或者,您可能希望查看特定消息,以确认其中一些消息是否未采用 Vertica 能够解析的正确格式。您可以使用 kafkacat 通过其使用命令 (-C
) 从主题中读取消息。至少必须向 kafkacat 传递理(-b
实参)以及要从其读取消息的主题 (-t
)。此外,还可以选择从特定偏移量 (-o
) 和分区 (-p
) 读取消息。您通常还希望 kafkacat 在完成数据读取后退出 (-e
),而不是继续等待读取更多消息。
以下示例将获取名为 web_hits 的主题中的最后一条消息。offset 实参将使用负值,从而告知 kafkacat 从主题的结尾处读取消息。
$ kafkacat -C -b kafka01:6667 -t web_hits -o -1 -e
{"url": "wp-content/list/search.php", "ip": "132.74.240.52",
"date": "2018/03/28 14:12:34",
"user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 4_2 like
Mac OS X; sl-SI) AppleWebKit/532.22.4 (KHTML, like Gecko) Version/3.0.5
Mobile/8B117 Safari/6532.22.4"}
% Reached end of topic web_hits [0] at offset 54932: exiting
还可以通过指定偏移量和限值(-c
实参)来读取特定范围的消息。例如,您可能希望查看特定范围的数据以确定 Vertica 无法加载数据的原因。以下示例将从偏移量 3280 处开始读取主题 iot-data 中的 10 条消息:
$ kafkacat -C -b kafka01:6667 -t iot-data -o 3280 -c 10 -e
63680, 19, 24.439323, 26.0128725
43510, 71, 162.319805, -37.4924025
91113, 53, 139.764857, -74.735731
88508, 14, -85.821967, -7.236280
20419, 31, -129.583988, 13.995481
79365, 79, 153.184594, 51.1583485
26283, 25, -168.911020, 35.331027
32111, 56, -157.930451, 82.2676385
56808, 17, 19.603286, -0.7698495
9118, 73, 97.365445, -74.8593245
为 Kafka 主题生成数据
如果准备从尚未处于活动状态的 Kafka 主题流式传输数据,可能需要一种流式传输测试消息的方法。然后,可以验证该主题的消息是否已加载到 Vertica 中,而不必担心会丢失实际数据。
要将数据发送到 Kafka,请使用 kafkacat 的生产命令 (-P)。向它提供消息的最简单方法是通过 STDIN 使用管道传输消息,每行一条消息。您可以为数据选择一个特定的分区,或者通过将分区编号设置为 -1 让 kafkacat 将每条消息随机分配到一个随机分区。例如,假设您有一个名为 iot-data.csv 的文件,并且希望将其生成到名为 iot-data 的 Kafka 主题的随机分区中。那么,您可以使用以下命令:
$ cat iot_data.csv | kafkacat -P -p -1 -b kafka01:6667 -t iot-data