这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

监控消息使用情况

可以通过多种方式监控来自 Kafka 的数据流式传输的进度:

  • 监控 Vertica 向其报告进度的使用者组。如果要用于监控数据加载情况的工具能够与 Kafka 协调使用,那么这种技术最好。

  • 使用 vkconfig 工具中内置的监控 API。这些 API 以 JSON 格式报告流式传输调度程序的配置和使用情况。如果您正在开发自己的监控脚本,或者您的监控工具可以使用 JSON 格式的状态信息,这些 API 会非常有用。

1 - 通过使用者组监控 Vertica 消息使用情况

Apache Kafka 具有名为使用者组的功能,可帮助在多组使用者之间分配消息使用负载。采用使用者组时,Kafka 会根据组中的使用者数量来平均分配消息。使用者会向 Kafka 代理报告已成功读取的消息。此报告可帮助 Kafka 管理主题分区中的消息偏移量,以便不会向组中的任何使用者发送两次相同的消息。

在管理负载分配或防止重复加载消息方面,Vertica 不依赖于 Kafka 的使用者组。流式传输作业调度程序会自行管理主题分区偏移量。

即使 Vertica 不需要使用者组来管理偏移量,它也会向 Kafka 代理报告已使用的消息。借助此功能,您可以在 Vertica 群集加载消息时使用第三方工具监控其进度。默认情况下,Vertica 会向名为 vertica-databaseName(其中 databaseName 是 Vertica 数据库的名称)的使用者组报告其进度。在定义调度程序时或手动加载数据期间,可以更改 Vertica 向其报告进度的使用者组的名称。在加载数据时,第三方工具可以让 Kafka 代理监控 Vertica 群集的进度。

例如,可以使用 Kafka 的 kafka-consumer-groups.sh 脚本(位于 Kafka 安装的 bin 目录中)查看 Vertica 使用者组的状态。以下示例演示了如何列出 Kafka 群集中定义的可用使用者组并显示 Vertica 使用者组的详细信息:

$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

vertica-vmart
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
   --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'vertica-vmart' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
web_hits                       0          24500           30000           5500       -                                                 -                              -

从输出中,可以看到 Vertica 向 vertica-vmart 使用者组报告消息使用情况。此组是 Vertica 加载示例 VMart 数据库时的默认使用者组。第二个命令列出了 vertica-vmart 使用者组正在使用的主题。可以看到 Vertica 群集已读取主题唯一分区中的 24500 条消息(总共 30000 条)。稍后,运行相同的命令将显示 Vertica 群集的进度:

$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
    --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'vertica-vmart' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
web_hits                       0          30000           30000           0          -

更改 Vertica 向其报告进度的使用者组

可以更改 Vertica 在使用消息时向其报告进度的使用者组。

使用调度程序自动加载时进行更改

使用调度程序时,可通过将 --consumer-group-id 实参设为 vkconfig 脚本的调度程序微批处理实用程序来设置使用者组。例如,假设您希望设置调度程序中显示的示例调度程序向名为 vertica-database 的使用者组报告其使用情况。那么,可以使用以下命令:

$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update \
    --conf weblog.conf --microbatch weblog --consumer-group-id vertica-database

当调度程序开始加载数据时,它将开始更新新的使用者组。使用 kafka-consumer-groups.sh 可以在 Kafka 节点上看到此新使用者组。

使用 --list 选项可返回使用者组:

$ /path/to/kafka/bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

vertica-database
vertica-vmart

使用 --describe--group 选项可返回有关特定使用者组的详细信息:

$ /path/to/kafka/bin/kafka-consumer-groups.sh --describe --group vertica-database \
                                          --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'vertica-database' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
web_hits                       0          30300           30300           0          -                                                 -                              -

手动加载时进行更改

要在手动加载数据时更改使用者组,请使用 KafkaSource 函数的 group_id 参数:

=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
                                    brokers='kafka01.example.com:9092',
                                    stop_on_eof=True,
                                    group_id='vertica_database')
                 PARSER KafkaJSONParser();
 Rows Loaded
-------------
       50000
(1 row)

加载消息时采用使用者组偏移量

可以选择让调度程序、手动加载或自定义加载脚本从使用者组的偏移量处开始加载消息。要从存储在使用者组中的最后偏移量处加载消息,请使用特殊的偏移量 -3。

使用调度程序自动加载的示例

要指示调度程序从使用者组已保存的偏移量处加载消息,请使用 vkconfig 脚本微批处理工具的 --offset 实参。

  1. 停止使用 shutdown 命令的调度程序以及用于创建调度程序的配置文件:

    $ /opt/vertica/packages/kafka/bin/vkconfig microbatch shutdown --conf weblog.conf
    
  2. 将微批处理 --offset 选项设为 -3:

    $ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update --conf weblog.conf --microbatch weblog --offset -3
    

这会将调度程序从其读取的所有主题分区的偏移量设置为 -3。调度程序将采用使用者组已保存的偏移量开始下一次加载,并且所有后续加载均将使用保存在 stream_microbatch_history 中的偏移量。

手动加载示例

以下示例将从 web_hits 主题加载消息,该主题具有包含 51,000 条消息的分区。有关使用 KafkaSource 进行手动加载的详细信息,请参阅手动使用来自 Kafka 的数据

  1. 第一个 COPY 语句将创建名为 vertica_manual 的使用者组,并从 web_hits 主题中的第一个分区加载前 50,000 条消息:

    => COPY web_hits
       SOURCE KafkaSource(stream='web_hits|0|0|50000',
                                  brokers='kafka01.example.com:9092',
                                  stop_on_eof=True,
                                  group_id='vertica_manual')
       PARSER KafkaJSONParser()
       REJECTED DATA AS TABLE public.web_hits_rejections;
     Rows Loaded
    -------------
           50000
    (1 row)
    
  2. 下一个 COPY 语句将传递 -3 作为 start_offset 流参数,以便从使用者组已保存的偏移量处进行加载:

    => COPY web_hits
       SOURCE KafkaSource(stream='web_hits|0|-3',
                                  brokers='kafka01.example.com:9092',
                                  stop_on_eof=True,
                                  group_id='vertica_manual')
       PARSER KafkaJSONParser()
       REJECTED DATA AS TABLE public.web_hits_rejections;
     Rows Loaded
    -------------
            1000
    (1 row)
    

禁用使用者组报告

默认情况下,Vertica 会向 Kafka 报告其使用的消息的偏移量。如果没有专门为 Vertica 配置使用者组,它仍会向名为 vertica_database-name(其中 database- name 是 Vertica 当前正在运行的数据库的名称)的使用者组报告其偏移量。

如果要完全禁止 Vertica 向 Kafka 报告其使用情况,可以将使用者组设为空字符串或 NULL。例如:

=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
                                    brokers='kafka01.example.com:9092',
                                    stop_on_eof=True,
                                    group_id=NULL)
                 PARSER KafkaJsonParser();
 Rows Loaded
-------------
       60000
(1 row)

2 - 从 vkconfig 获取配置和统计信息

vkconfig 工具有两个可帮助检查调度程序的配置并监控数据加载情况的功能:

  • 配置调度程序(调度程序、群集、源、目标、加载规范和微批处理)的 vkconfig 工具具有 --read 实参,可让这些工具在调度程序中输出其当前设置。

  • vkconfig 统计信息工具可用于获取有关微批处理的统计信息。根据日期和时间范围、群集、分区及其他条件,可以筛选微批处理记录。

上述两个功能均以 JSON 格式输出数据。可以使用能够使用 JSON 数据的第三方工具或编写自己的脚本来处理配置和统计信息数据。

此外,还可以通过查询调度程序架构中的配置表来访问这些 vkconfig 选项提供的数据。但是,您可能会发现这些选项变得更加易于使用,因为它们不需要您连接到 Vertica 数据库。

获取配置信息

--read 选项传递到 vkconfig 的配置工具即可获取该工具可以设置的选项的当前设置。此输出采用 JSON 格式。以下示例演示了如何为 weblog.conf 配置文件中定义的调度程序从调度程序和群集工具中获取配置信息:

$ vkconfig scheduler --read --conf weblog.conf
{"version":"v9.2.0", "frame_duration":"00:00:10", "resource_pool":"weblog_pool",
 "config_refresh":"00:05:00", "new_source_policy":"FAIR",
 "pushback_policy":"LINEAR", "pushback_max_count":5, "auto_sync":true,
 "consumer_group_id":null}

$ vkconfig cluster --read --conf weblog.conf
{"cluster":"kafka_weblog", "hosts":"kafak01.example.com:9092,kafka02.example.com:9092"}

--read 选项将列出该工具在调度程序架构中创建的所有值。例如,如果已在调度程序中定义多个目标,则 --read 选项会列出所有目标。

$ vkconfig target --list --conf weblog.conf
{"target_schema":"public", "target_table":"health_data"}
{"target_schema":"public", "target_table":"iot_data"}
{"target_schema":"public", "target_table":"web_hits"}

可以使用 vkconfig 工具接受的其他实参来筛选 --read 选项输出。例如,在群集工具中,可以使用 --host 实参将输出限制为仅显示包含特定主机的群集。这些实参支持 LIKE 谓词通配符,因此可以匹配部分值。有关使用通配符的详细信息,请参阅 LIKE 谓词

以下示例演示了如何使用 --host 实参来筛选群集工具的 --read 选项的输出。第一次调用显示未经筛选的输出。第二次调用可筛选输出,以仅显示以“kafka”开头的群集:

$ vkconfig cluster --read --conf weblog.conf
{"cluster":"some_cluster", "hosts":"host01.example.com"}
{"cluster":"iot_cluster",
 "hosts":"kafka-iot01.example.com:9092,kafka-iot02.example.com:9092"}
{"cluster":"weblog",
 "hosts":"web01.example.com.com:9092,web02.example.com:9092"}
{"cluster":"streamcluster1",
 "hosts":"kafka-a-01.example.com:9092,kafka-a-02.example.com:9092"}
{"cluster":"test_cluster",
 "hosts":"test01.example.com:9092,test02.example.com:9092"}

$ vkconfig cluster --read --conf weblog.conf --hosts kafka%
{"cluster":"iot_cluster",
 "hosts":"kafka-iot01.example.com:9092,kafka-iot02.example.com:9092"}
{"cluster":"streamcluster1",
 "hosts":"kafka-a-01.example.com:9092,kafka-a-02.example.com:9092"}

有关详细信息,请参阅群集工具选项加载规范工具选项微批处理工具选项调度程序工具选项目标工具选项源工具选项

获取流式传输数据加载统计信息

vkconfig 脚本的统计信息工具可用于查看调度程序微批处理的历史记录。可以使用以下条件的任意组合来筛选结果:

  • 微批处理的名称

  • 作为数据加载源的 Kafka 群集

  • 主题的名称

  • 主题内的分区

  • 数据加载所针对的 Vertica 架构和表

  • 日期和时间范围

  • 最新的微批处理

有关此工具中提供的所有选项,请参阅统计信息工具选项

以下示例将获取调度程序运行的最后两个微批处理:

$ vkconfig statistics --last 2 --conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
 "source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
 "start_offset":73300, "end_offset":73399, "end_reason":"END_OF_STREAM",
 "end_reason_message":null, "partition_bytes":19588, "partition_messages":100,
 "timeslice":"00:00:09.807000", "batch_start":"2018-11-02 13:22:07.825295",
 "batch_end":"2018-11-02 13:22:08.135299", "source_duration":"00:00:00.219619",
 "consecutive_error_count":null, "transaction_id":45035996273976123,
 "frame_start":"2018-11-02 13:22:07.601", "frame_end":null}
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
 "source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
 "start_offset":73200, "end_offset":73299, "end_reason":"END_OF_STREAM",
 "end_reason_message":null, "partition_bytes":19781, "partition_messages":100,
 "timeslice":"00:00:09.561000", "batch_start":"2018-11-02 13:21:58.044698",
 "batch_end":"2018-11-02 13:21:58.335431", "source_duration":"00:00:00.214868",
 "consecutive_error_count":null, "transaction_id":45035996273976095,
 "frame_start":"2018-11-02 13:21:57.561", "frame_end":null}

以下示例将从名为 web_hits 的源获取介于 2018 年 11 月 2 日 13:21:00 到 13:21:20 之间的微批处理:

$ vkconfig statistics --source "web_hits" --from-timestamp \
           "2018-11-02 13:21:00" --to-timestamp "2018-11-02 13:21:20"  \
           --conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
 "source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
 "start_offset":72800, "end_offset":72899, "end_reason":"END_OF_STREAM",
 "end_reason_message":null, "partition_bytes":19989, "partition_messages":100,
 "timeslice":"00:00:09.778000", "batch_start":"2018-11-02 13:21:17.581606",
 "batch_end":"2018-11-02 13:21:18.850705", "source_duration":"00:00:01.215751",
 "consecutive_error_count":null, "transaction_id":45035996273975997,
 "frame_start":"2018-11-02 13:21:17.34", "frame_end":null}
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
 "source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
 "start_offset":72700, "end_offset":72799, "end_reason":"END_OF_STREAM",
 "end_reason_message":null, "partition_bytes":19640, "partition_messages":100,
 "timeslice":"00:00:09.857000", "batch_start":"2018-11-02 13:21:07.470834",
 "batch_end":"2018-11-02 13:21:08.737255", "source_duration":"00:00:01.218932",
 "consecutive_error_count":null, "transaction_id":45035996273975978,
 "frame_start":"2018-11-02 13:21:07.309", "frame_end":null}

有关使用此工具的更多示例,请参阅统计信息工具选项