1 - Monitoring Vertica message consumption with consumer groups
Apache Kafka has a feature named consumer groups that helps distribute message consumption loads across sets of consumers.
Apache Kafka has a feature named consumer groups that helps distribute message consumption loads across sets of consumers. When using consumer groups, Kafka evenly divides up messages based on the number of consumers in the group. Consumers report back to the Kafka broker which messages it read successfully. This reporting helps Kafka to manage message offsets in the topic's partitions, so that no consumer in the group is sent the same message twice.
Vertica does not rely on Kafka's consumer groups to manage load distribution or preventing duplicate loads of messages. The streaming job scheduler manages topic partition offsets on its own.
Even though Vertica does not need consumer groups to manage offsets, it does report back to the Kafka brokers which messages it consumed. This feature lets you use third-party tools to monitor the Vertica cluster's progress as it loads messages. By default, Vertica reports its progress to a consumer group named vertica-databaseName, where databaseName is the name of the Vertica database. You can change the name of the consumer group that Vertica reports its progress to when defining a scheduler or during manual loads of data. Third party tools can query the Kafka brokers to monitor the Vertica cluster's progress when loading data.
For example, you can use Kafka's kafka-consumer-groups.sh
script (located in the bin
directory of your Kafka installation) to view the status of the Vertica consumer group. The following example demonstrates listing the consumer groups available defined in the Kafka cluster and showing the details of the Vertica consumer group:
$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
vertica-vmart
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
--bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'vertica-vmart' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
web_hits 0 24500 30000 5500 - - -
From the output, you can see that Vertica reports its consumption of messages back to the vertica-vmart consumer group. This group is the default consumer group when Vertica has the example VMart database loaded. The second command lists the topics being consumed by the vertica-vmart consumer group. You can see that the Vertica cluster has read 24500 of the 30000 messages in the topic's only partition. Later, running the same command will show the Vertica cluster's progress:
$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
--bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'vertica-vmart' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
web_hits 0 30000 30000 0 -
Changing the consumer group where Vertica reports its progress
You can change the consumer group that Vertica reports its progress to when consuming messages.
Changing for automatic loads with the scheduler
When using a scheduler, you set the consumer group by setting the --consumer-group-id
argument to the vkconfig script's scheduler or microbatch utilities. For example, suppose you want the example scheduler shown in Setting up a scheduler to report its consumption to the consumer group name vertica-database. Then you could use the command:
$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update \
--conf weblog.conf --microbatch weblog --consumer-group-id vertica-database
When the scheduler begins loading data, it will start updating the new consumer group. You can see this on a Kafka node using kafka-consumer-groups.sh
.
Use the --list
option to return the consumer groups:
$ /path/to/kafka/bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
vertica-database
vertica-vmart
Use the --describe
and --group
options to return details about a specific consumer group:
$ /path/to/kafka/bin/kafka-consumer-groups.sh --describe --group vertica-database \
--bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'vertica-database' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
web_hits 0 30300 30300 0 - - -
Changing for manual loads
To change the consumer group when manually loading data, use the group_id parameter of KafkaSource function:
=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
brokers='kafka01.example.com:9092',
stop_on_eof=True,
group_id='vertica_database')
PARSER KafkaJSONParser();
Rows Loaded
-------------
50000
(1 row)
Using consumer group offsets when loading messages
You can choose to have your scheduler, manual load, or custom loading script start loading messages from the consumer group's offset. To load messages from the last offset stored in the consumer group, use the special -3 offset.
Automatic load with the scheduler example
To instruct your scheduler to load messages from the consumer group's saved offset, use the vkconfig script microbatch tool's --offset
argument.
-
Stop the scheduler using the shutdown command and the configuration file that you used to create the scheduler:
$ /opt/vertica/packages/kafka/bin/vkconfig microbatch shutdown --conf weblog.conf
-
Set the microbatch --offset option to -3:
$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update --conf weblog.conf --microbatch weblog --offset -3
This sets the offset to -3 for all topic partitions that your scheduler reads from. The scheduler begins the next load with the consumer group's saved offset, and all subsequent loads use the offset saved in stream_microbatch_history.
Manual load example
This example loads messages from the web_hits topic that has one partition consisting of 51,000 messages. For details about manual loads with KafkaSource, see Manually consume data from Kafka.
-
The first COPY statement creates a consumer group named vertica_manual, and loads the first 50,000 messages from the first partition in the web_hits topic:
=> COPY web_hits
SOURCE KafkaSource(stream='web_hits|0|0|50000',
brokers='kafka01.example.com:9092',
stop_on_eof=True,
group_id='vertica_manual')
PARSER KafkaJSONParser()
REJECTED DATA AS TABLE public.web_hits_rejections;
Rows Loaded
-------------
50000
(1 row)
-
The next COPY statement passes -3 as the start_offset stream parameter to load from the consumer group's saved offset:
=> COPY web_hits
SOURCE KafkaSource(stream='web_hits|0|-3',
brokers='kafka01.example.com:9092',
stop_on_eof=True,
group_id='vertica_manual')
PARSER KafkaJSONParser()
REJECTED DATA AS TABLE public.web_hits_rejections;
Rows Loaded
-------------
1000
(1 row)
Disabling consumer group reporting
Vertica reports the offsets of the messages it consumes to Kafka by default. If you do not specifically configure a consumer group for Vertica, it still reports its offsets to a consumer group named vertica_database-name (where database-name is the name of the database Vertica is currently running).
If you want to completely disable having Vertica report its consumption back to Kafka, you can set the consumer group to an empty string or NULL. For example:
=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
brokers='kafka01.example.com:9092',
stop_on_eof=True,
group_id=NULL)
PARSER KafkaJsonParser();
Rows Loaded
-------------
60000
(1 row)
2 - Getting configuration and statistics information from vkconfig
The vkconfig tool has two features that help you examine your scheduler's configuration and monitor your data load:.
The vkconfig tool has two features that help you examine your scheduler's configuration and monitor your data load:
-
The vkconfig tools that configure your scheduler (scheduler, cluster, source, target, load-spec, and microbatch) have a --read
argument that has them output their current settings in the scheduler.
-
The vkconfig statistics tool lets you get statistics on your microbatches. You can filter the microbatch records based on a date and time range, cluster, partition, and other criteria.
Both of these features output their data in JSON format. You can use third-party tools that can consume JSON data or write your own scripts to process the configuration and statics data.
You can also access the data provided by these vkconfig options by querying the configuration tables in the scheduler's schema. However, you may find these options easier to use as they do not require you to connect to the Vertica database.
You pass the --read
option to vkconfig's configuration tools to get the current settings for the options that the tool can set. This output is in JSON format. This example demonstrates getting the configuration information from the scheduler and cluster tools for the scheduler defined in the weblog.conf configuration file:
$ vkconfig scheduler --read --conf weblog.conf
{"version":"v9.2.0", "frame_duration":"00:00:10", "resource_pool":"weblog_pool",
"config_refresh":"00:05:00", "new_source_policy":"FAIR",
"pushback_policy":"LINEAR", "pushback_max_count":5, "auto_sync":true,
"consumer_group_id":null}
$ vkconfig cluster --read --conf weblog.conf
{"cluster":"kafka_weblog", "hosts":"kafak01.example.com:9092,kafka02.example.com:9092"}
The --read
option lists all of values created by the tool in the scheduler schema. For example, if you have defined multiple targets in your scheduler, the --read
option lists all of them.
$ vkconfig target --list --conf weblog.conf
{"target_schema":"public", "target_table":"health_data"}
{"target_schema":"public", "target_table":"iot_data"}
{"target_schema":"public", "target_table":"web_hits"}
You can filter the --read
option output using the other arguments that the vkconfig tools accept. For example, in the cluster tool, you can use the --host
argument to limit the output to just show clusters that contain a specific host. These arguments support LIKE-predicate wildcards, so you can match partial values. See LIKE for more information about using wildcards.
The following example demonstrates how you can filter the output of the --read
option of the cluster tool using the --host
argument. The first call shows the unfiltered output. The second call filters the output to show only those clusters that start with "kafka":
$ vkconfig cluster --read --conf weblog.conf
{"cluster":"some_cluster", "hosts":"host01.example.com"}
{"cluster":"iot_cluster",
"hosts":"kafka-iot01.example.com:9092,kafka-iot02.example.com:9092"}
{"cluster":"weblog",
"hosts":"web01.example.com.com:9092,web02.example.com:9092"}
{"cluster":"streamcluster1",
"hosts":"kafka-a-01.example.com:9092,kafka-a-02.example.com:9092"}
{"cluster":"test_cluster",
"hosts":"test01.example.com:9092,test02.example.com:9092"}
$ vkconfig cluster --read --conf weblog.conf --hosts kafka%
{"cluster":"iot_cluster",
"hosts":"kafka-iot01.example.com:9092,kafka-iot02.example.com:9092"}
{"cluster":"streamcluster1",
"hosts":"kafka-a-01.example.com:9092,kafka-a-02.example.com:9092"}
See the Cluster tool options, Load spec tool options, Microbatch tool options, Scheduler tool options, Target tool options, and Source tool options for more information.
Getting streaming data load statistics
The vkconfig script's statistics tool lets you view the history of your scheduler's microbatches. You can filter the results using any combination of the following criteria:
-
The name of the microbatch
-
The Kafka cluster that was the source of the data load
-
The name of the topic
-
The partition within the topic
-
The Vertica schema and table targeted by the data load
-
A date and time range
-
The latest microbatches
See Statistics tool options for all of the options available in this tool.
This example gets the last two microbatches that the scheduler ran:
$ vkconfig statistics --last 2 --conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":73300, "end_offset":73399, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":19588, "partition_messages":100,
"timeslice":"00:00:09.807000", "batch_start":"2018-11-02 13:22:07.825295",
"batch_end":"2018-11-02 13:22:08.135299", "source_duration":"00:00:00.219619",
"consecutive_error_count":null, "transaction_id":45035996273976123,
"frame_start":"2018-11-02 13:22:07.601", "frame_end":null}
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":73200, "end_offset":73299, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":19781, "partition_messages":100,
"timeslice":"00:00:09.561000", "batch_start":"2018-11-02 13:21:58.044698",
"batch_end":"2018-11-02 13:21:58.335431", "source_duration":"00:00:00.214868",
"consecutive_error_count":null, "transaction_id":45035996273976095,
"frame_start":"2018-11-02 13:21:57.561", "frame_end":null}
This example gets the microbatches from the source named web_hits between 13:21:00 and 13:21:20 on November 2nd 2018:
$ vkconfig statistics --source "web_hits" --from-timestamp \
"2018-11-02 13:21:00" --to-timestamp "2018-11-02 13:21:20" \
--conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":72800, "end_offset":72899, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":19989, "partition_messages":100,
"timeslice":"00:00:09.778000", "batch_start":"2018-11-02 13:21:17.581606",
"batch_end":"2018-11-02 13:21:18.850705", "source_duration":"00:00:01.215751",
"consecutive_error_count":null, "transaction_id":45035996273975997,
"frame_start":"2018-11-02 13:21:17.34", "frame_end":null}
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":72700, "end_offset":72799, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":19640, "partition_messages":100,
"timeslice":"00:00:09.857000", "batch_start":"2018-11-02 13:21:07.470834",
"batch_end":"2018-11-02 13:21:08.737255", "source_duration":"00:00:01.218932",
"consecutive_error_count":null, "transaction_id":45035996273975978,
"frame_start":"2018-11-02 13:21:07.309", "frame_end":null}
See Statistics tool options for more examples of using this tool.