可以通过多种方式监控来自 Kafka 的数据流式传输的进度:
-
监控 Vertica 向其报告进度的使用者组。如果要用于监控数据加载情况的工具能够与 Kafka 协调使用,那么这种技术最好。
-
使用 vkconfig 工具中内置的监控 API。这些 API 以 JSON 格式报告流式传输调度程序的配置和使用情况。如果您正在开发自己的监控脚本,或者您的监控工具可以使用 JSON 格式的状态信息,这些 API 会非常有用。
可以通过多种方式监控来自 Kafka 的数据流式传输的进度:
监控 Vertica 向其报告进度的使用者组。如果要用于监控数据加载情况的工具能够与 Kafka 协调使用,那么这种技术最好。
使用 vkconfig 工具中内置的监控 API。这些 API 以 JSON 格式报告流式传输调度程序的配置和使用情况。如果您正在开发自己的监控脚本,或者您的监控工具可以使用 JSON 格式的状态信息,这些 API 会非常有用。
Apache Kafka 具有名为使用者组的功能,可帮助在多组使用者之间分配消息使用负载。采用使用者组时,Kafka 会根据组中的使用者数量来平均分配消息。使用者会向 Kafka 代理报告已成功读取的消息。此报告可帮助 Kafka 管理主题分区中的消息偏移量,以便不会向组中的任何使用者发送两次相同的消息。
在管理负载分配或防止重复加载消息方面,Vertica 不依赖于 Kafka 的使用者组。流式传输作业调度程序会自行管理主题分区偏移量。
即使 Vertica 不需要使用者组来管理偏移量,它也会向 Kafka 代理报告已使用的消息。借助此功能,您可以在 Vertica 群集加载消息时使用第三方工具监控其进度。默认情况下,Vertica 会向名为 vertica-databaseName(其中 databaseName 是 Vertica 数据库的名称)的使用者组报告其进度。在定义调度程序时或手动加载数据期间,可以更改 Vertica 向其报告进度的使用者组的名称。在加载数据时,第三方工具可以让 Kafka 代理监控 Vertica 群集的进度。
例如,可以使用 Kafka 的 kafka-consumer-groups.sh
脚本(位于 Kafka 安装的 bin
目录中)查看 Vertica 使用者组的状态。以下示例演示了如何列出 Kafka 群集中定义的可用使用者组并显示 Vertica 使用者组的详细信息:
$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
vertica-vmart
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
--bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'vertica-vmart' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
web_hits 0 24500 30000 5500 - - -
从输出中,可以看到 Vertica 向 vertica-vmart 使用者组报告消息使用情况。此组是 Vertica 加载示例 VMart 数据库时的默认使用者组。第二个命令列出了 vertica-vmart 使用者组正在使用的主题。可以看到 Vertica 群集已读取主题唯一分区中的 24500 条消息(总共 30000 条)。稍后,运行相同的命令将显示 Vertica 群集的进度:
$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
--bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'vertica-vmart' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
web_hits 0 30000 30000 0 -
可以更改 Vertica 在使用消息时向其报告进度的使用者组。
使用调度程序时,可通过将 --consumer-group-id
实参设为 vkconfig 脚本的调度程序或微批处理实用程序来设置使用者组。例如,假设您希望设置调度程序中显示的示例调度程序向名为 vertica-database 的使用者组报告其使用情况。那么,可以使用以下命令:
$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update \
--conf weblog.conf --microbatch weblog --consumer-group-id vertica-database
当调度程序开始加载数据时,它将开始更新新的使用者组。使用 kafka-consumer-groups.sh
可以在 Kafka 节点上看到此新使用者组。
使用 --list
选项可返回使用者组:
$ /path/to/kafka/bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
vertica-database
vertica-vmart
使用 --describe
和 --group
选项可返回有关特定使用者组的详细信息:
$ /path/to/kafka/bin/kafka-consumer-groups.sh --describe --group vertica-database \
--bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'vertica-database' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
web_hits 0 30300 30300 0 - - -
要在手动加载数据时更改使用者组,请使用 KafkaSource 函数的 group_id 参数:
=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
brokers='kafka01.example.com:9092',
stop_on_eof=True,
group_id='vertica_database')
PARSER KafkaJSONParser();
Rows Loaded
-------------
50000
(1 row)
可以选择让调度程序、手动加载或自定义加载脚本从使用者组的偏移量处开始加载消息。要从存储在使用者组中的最后偏移量处加载消息,请使用特殊的偏移量 -3。
要指示调度程序从使用者组已保存的偏移量处加载消息,请使用 vkconfig 脚本微批处理工具的 --offset
实参。
停止使用 shutdown 命令的调度程序以及用于创建调度程序的配置文件:
$ /opt/vertica/packages/kafka/bin/vkconfig microbatch shutdown --conf weblog.conf
将微批处理 --offset 选项设为 -3:
$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update --conf weblog.conf --microbatch weblog --offset -3
这会将调度程序从其读取的所有主题分区的偏移量设置为 -3。调度程序将采用使用者组已保存的偏移量开始下一次加载,并且所有后续加载均将使用保存在 stream_microbatch_history 中的偏移量。
以下示例将从 web_hits 主题加载消息,该主题具有包含 51,000 条消息的分区。有关使用 KafkaSource 进行手动加载的详细信息,请参阅手动使用来自 Kafka 的数据。
第一个 COPY 语句将创建名为 vertica_manual 的使用者组,并从 web_hits 主题中的第一个分区加载前 50,000 条消息:
=> COPY web_hits
SOURCE KafkaSource(stream='web_hits|0|0|50000',
brokers='kafka01.example.com:9092',
stop_on_eof=True,
group_id='vertica_manual')
PARSER KafkaJSONParser()
REJECTED DATA AS TABLE public.web_hits_rejections;
Rows Loaded
-------------
50000
(1 row)
下一个 COPY 语句将传递 -3 作为 start_offset 流参数,以便从使用者组已保存的偏移量处进行加载:
=> COPY web_hits
SOURCE KafkaSource(stream='web_hits|0|-3',
brokers='kafka01.example.com:9092',
stop_on_eof=True,
group_id='vertica_manual')
PARSER KafkaJSONParser()
REJECTED DATA AS TABLE public.web_hits_rejections;
Rows Loaded
-------------
1000
(1 row)
默认情况下,Vertica 会向 Kafka 报告其使用的消息的偏移量。如果没有专门为 Vertica 配置使用者组,它仍会向名为 vertica_database-name(其中 database- name 是 Vertica 当前正在运行的数据库的名称)的使用者组报告其偏移量。
如果要完全禁止 Vertica 向 Kafka 报告其使用情况,可以将使用者组设为空字符串或 NULL。例如:
=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
brokers='kafka01.example.com:9092',
stop_on_eof=True,
group_id=NULL)
PARSER KafkaJsonParser();
Rows Loaded
-------------
60000
(1 row)
vkconfig 工具有两个可帮助检查调度程序的配置并监控数据加载情况的功能:
配置调度程序(调度程序、群集、源、目标、加载规范和微批处理)的 vkconfig 工具具有 --read
实参,可让这些工具在调度程序中输出其当前设置。
vkconfig 统计信息工具可用于获取有关微批处理的统计信息。根据日期和时间范围、群集、分区及其他条件,可以筛选微批处理记录。
上述两个功能均以 JSON 格式输出数据。可以使用能够使用 JSON 数据的第三方工具或编写自己的脚本来处理配置和统计信息数据。
此外,还可以通过查询调度程序架构中的配置表来访问这些 vkconfig 选项提供的数据。但是,您可能会发现这些选项变得更加易于使用,因为它们不需要您连接到 Vertica 数据库。
将 --read
选项传递到 vkconfig 的配置工具即可获取该工具可以设置的选项的当前设置。此输出采用 JSON 格式。以下示例演示了如何为 weblog.conf 配置文件中定义的调度程序从调度程序和群集工具中获取配置信息:
$ vkconfig scheduler --read --conf weblog.conf
{"version":"v9.2.0", "frame_duration":"00:00:10", "resource_pool":"weblog_pool",
"config_refresh":"00:05:00", "new_source_policy":"FAIR",
"pushback_policy":"LINEAR", "pushback_max_count":5, "auto_sync":true,
"consumer_group_id":null}
$ vkconfig cluster --read --conf weblog.conf
{"cluster":"kafka_weblog", "hosts":"kafak01.example.com:9092,kafka02.example.com:9092"}
--read
选项将列出该工具在调度程序架构中创建的所有值。例如,如果已在调度程序中定义多个目标,则 --read
选项会列出所有目标。
$ vkconfig target --list --conf weblog.conf
{"target_schema":"public", "target_table":"health_data"}
{"target_schema":"public", "target_table":"iot_data"}
{"target_schema":"public", "target_table":"web_hits"}
可以使用 vkconfig 工具接受的其他实参来筛选 --read
选项输出。例如,在群集工具中,可以使用 --host
实参将输出限制为仅显示包含特定主机的群集。这些实参支持 LIKE 谓词通配符,因此可以匹配部分值。有关使用通配符的详细信息,请参阅 LIKE 谓词。
以下示例演示了如何使用 --host
实参来筛选群集工具的 --read
选项的输出。第一次调用显示未经筛选的输出。第二次调用可筛选输出,以仅显示以“kafka”开头的群集:
$ vkconfig cluster --read --conf weblog.conf
{"cluster":"some_cluster", "hosts":"host01.example.com"}
{"cluster":"iot_cluster",
"hosts":"kafka-iot01.example.com:9092,kafka-iot02.example.com:9092"}
{"cluster":"weblog",
"hosts":"web01.example.com.com:9092,web02.example.com:9092"}
{"cluster":"streamcluster1",
"hosts":"kafka-a-01.example.com:9092,kafka-a-02.example.com:9092"}
{"cluster":"test_cluster",
"hosts":"test01.example.com:9092,test02.example.com:9092"}
$ vkconfig cluster --read --conf weblog.conf --hosts kafka%
{"cluster":"iot_cluster",
"hosts":"kafka-iot01.example.com:9092,kafka-iot02.example.com:9092"}
{"cluster":"streamcluster1",
"hosts":"kafka-a-01.example.com:9092,kafka-a-02.example.com:9092"}
有关详细信息,请参阅群集工具选项、加载规范工具选项、微批处理工具选项、调度程序工具选项、目标工具选项和源工具选项。
vkconfig 脚本的统计信息工具可用于查看调度程序微批处理的历史记录。可以使用以下条件的任意组合来筛选结果:
微批处理的名称
作为数据加载源的 Kafka 群集
主题的名称
主题内的分区
数据加载所针对的 Vertica 架构和表
日期和时间范围
最新的微批处理
有关此工具中提供的所有选项,请参阅统计信息工具选项。
以下示例将获取调度程序运行的最后两个微批处理:
$ vkconfig statistics --last 2 --conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":73300, "end_offset":73399, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":19588, "partition_messages":100,
"timeslice":"00:00:09.807000", "batch_start":"2018-11-02 13:22:07.825295",
"batch_end":"2018-11-02 13:22:08.135299", "source_duration":"00:00:00.219619",
"consecutive_error_count":null, "transaction_id":45035996273976123,
"frame_start":"2018-11-02 13:22:07.601", "frame_end":null}
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":73200, "end_offset":73299, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":19781, "partition_messages":100,
"timeslice":"00:00:09.561000", "batch_start":"2018-11-02 13:21:58.044698",
"batch_end":"2018-11-02 13:21:58.335431", "source_duration":"00:00:00.214868",
"consecutive_error_count":null, "transaction_id":45035996273976095,
"frame_start":"2018-11-02 13:21:57.561", "frame_end":null}
以下示例将从名为 web_hits 的源获取介于 2018 年 11 月 2 日 13:21:00 到 13:21:20 之间的微批处理:
$ vkconfig statistics --source "web_hits" --from-timestamp \
"2018-11-02 13:21:00" --to-timestamp "2018-11-02 13:21:20" \
--conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":72800, "end_offset":72899, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":19989, "partition_messages":100,
"timeslice":"00:00:09.778000", "batch_start":"2018-11-02 13:21:17.581606",
"batch_end":"2018-11-02 13:21:18.850705", "source_duration":"00:00:01.215751",
"consecutive_error_count":null, "transaction_id":45035996273975997,
"frame_start":"2018-11-02 13:21:17.34", "frame_end":null}
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":72700, "end_offset":72799, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":19640, "partition_messages":100,
"timeslice":"00:00:09.857000", "batch_start":"2018-11-02 13:21:07.470834",
"batch_end":"2018-11-02 13:21:08.737255", "source_duration":"00:00:01.218932",
"consecutive_error_count":null, "transaction_id":45035996273975978,
"frame_start":"2018-11-02 13:21:07.309", "frame_end":null}
有关使用此工具的更多示例,请参阅统计信息工具选项。