通过使用者组监控 Vertica 消息使用情况
Apache Kafka 具有名为使用者组的功能,可帮助在多组使用者之间分配消息使用负载。采用使用者组时,Kafka 会根据组中的使用者数量来平均分配消息。使用者会向 Kafka 代理报告已成功读取的消息。此报告可帮助 Kafka 管理主题分区中的消息偏移量,以便不会向组中的任何使用者发送两次相同的消息。
在管理负载分配或防止重复加载消息方面,Vertica 不依赖于 Kafka 的使用者组。流式传输作业调度程序会自行管理主题分区偏移量。
即使 Vertica 不需要使用者组来管理偏移量,它也会向 Kafka 代理报告已使用的消息。借助此功能,您可以在 Vertica 群集加载消息时使用第三方工具监控其进度。默认情况下,Vertica 会向名为 vertica-databaseName(其中 databaseName 是 Vertica 数据库的名称)的使用者组报告其进度。在定义调度程序时或手动加载数据期间,可以更改 Vertica 向其报告进度的使用者组的名称。在加载数据时,第三方工具可以让 Kafka 代理监控 Vertica 群集的进度。
例如,可以使用 Kafka 的 kafka-consumer-groups.sh
脚本(位于 Kafka 安装的 bin
目录中)查看 Vertica 使用者组的状态。以下示例演示了如何列出 Kafka 群集中定义的可用使用者组并显示 Vertica 使用者组的详细信息:
$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
vertica-vmart
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
--bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'vertica-vmart' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
web_hits 0 24500 30000 5500 - - -
从输出中,可以看到 Vertica 向 vertica-vmart 使用者组报告消息使用情况。此组是 Vertica 加载示例 VMart 数据库时的默认使用者组。第二个命令列出了 vertica-vmart 使用者组正在使用的主题。可以看到 Vertica 群集已读取主题唯一分区中的 24500 条消息(总共 30000 条)。稍后,运行相同的命令将显示 Vertica 群集的进度:
$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
--bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'vertica-vmart' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
web_hits 0 30000 30000 0 -
更改 Vertica 向其报告进度的使用者组
可以更改 Vertica 在使用消息时向其报告进度的使用者组。
使用调度程序自动加载时进行更改
使用调度程序时,可通过将 --consumer-group-id
实参设为 vkconfig 脚本的调度程序或微批处理实用程序来设置使用者组。例如,假设您希望设置调度程序中显示的示例调度程序向名为 vertica-database 的使用者组报告其使用情况。那么,可以使用以下命令:
$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update \
--conf weblog.conf --microbatch weblog --consumer-group-id vertica-database
当调度程序开始加载数据时,它将开始更新新的使用者组。使用 kafka-consumer-groups.sh
可以在 Kafka 节点上看到此新使用者组。
使用 --list
选项可返回使用者组:
$ /path/to/kafka/bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
vertica-database
vertica-vmart
使用 --describe
和 --group
选项可返回有关特定使用者组的详细信息:
$ /path/to/kafka/bin/kafka-consumer-groups.sh --describe --group vertica-database \
--bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'vertica-database' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
web_hits 0 30300 30300 0 - - -
手动加载时进行更改
要在手动加载数据时更改使用者组,请使用 KafkaSource 函数的 group_id 参数:
=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
brokers='kafka01.example.com:9092',
stop_on_eof=True,
group_id='vertica_database')
PARSER KafkaJSONParser();
Rows Loaded
-------------
50000
(1 row)
加载消息时采用使用者组偏移量
可以选择让调度程序、手动加载或自定义加载脚本从使用者组的偏移量处开始加载消息。要从存储在使用者组中的最后偏移量处加载消息,请使用特殊的偏移量 -3。
使用调度程序自动加载的示例
要指示调度程序从使用者组已保存的偏移量处加载消息,请使用 vkconfig 脚本微批处理工具的 --offset
实参。
-
停止使用 shutdown 命令的调度程序以及用于创建调度程序的配置文件:
$ /opt/vertica/packages/kafka/bin/vkconfig microbatch shutdown --conf weblog.conf
-
将微批处理 --offset 选项设为 -3:
$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update --conf weblog.conf --microbatch weblog --offset -3
这会将调度程序从其读取的所有主题分区的偏移量设置为 -3。调度程序将采用使用者组已保存的偏移量开始下一次加载,并且所有后续加载均将使用保存在 stream_microbatch_history 中的偏移量。
手动加载示例
以下示例将从 web_hits 主题加载消息,该主题具有包含 51,000 条消息的分区。有关使用 KafkaSource 进行手动加载的详细信息,请参阅手动使用来自 Kafka 的数据。
-
第一个 COPY 语句将创建名为 vertica_manual 的使用者组,并从 web_hits 主题中的第一个分区加载前 50,000 条消息:
=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|0|50000', brokers='kafka01.example.com:9092', stop_on_eof=True, group_id='vertica_manual') PARSER KafkaJSONParser() REJECTED DATA AS TABLE public.web_hits_rejections; Rows Loaded ------------- 50000 (1 row)
-
下一个 COPY 语句将传递 -3 作为 start_offset 流参数,以便从使用者组已保存的偏移量处进行加载:
=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-3', brokers='kafka01.example.com:9092', stop_on_eof=True, group_id='vertica_manual') PARSER KafkaJSONParser() REJECTED DATA AS TABLE public.web_hits_rejections; Rows Loaded ------------- 1000 (1 row)
禁用使用者组报告
默认情况下,Vertica 会向 Kafka 报告其使用的消息的偏移量。如果没有专门为 Vertica 配置使用者组,它仍会向名为 vertica_database-name(其中 database- name 是 Vertica 当前正在运行的数据库的名称)的使用者组报告其偏移量。
如果要完全禁止 Vertica 向 Kafka 报告其使用情况,可以将使用者组设为空字符串或 NULL。例如:
=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
brokers='kafka01.example.com:9092',
stop_on_eof=True,
group_id=NULL)
PARSER KafkaJsonParser();
Rows Loaded
-------------
60000
(1 row)