Monitoring Vertica message consumption with consumer groups

Apache Kafka has a feature named consumer groups that helps distribute message consumption loads across sets of consumers.

Apache Kafka has a feature named consumer groups that helps distribute message consumption loads across sets of consumers. When using consumer groups, Kafka evenly divides up messages based on the number of consumers in the group. Consumers report back to the Kafka broker which messages it read successfully. This reporting helps Kafka to manage message offsets in the topic's partitions, so that no consumer in the group is sent the same message twice.

Vertica does not rely on Kafka's consumer groups to manage load distribution or preventing duplicate loads of messages. The streaming job scheduler manages topic partition offsets on its own.

Even though Vertica does not need consumer groups to manage offsets, it does report back to the Kafka brokers which messages it consumed. This feature lets you use third-party tools to monitor the Vertica cluster's progress as it loads messages. By default, Vertica reports its progress to a consumer group named vertica-databaseName, where databaseName is the name of the Vertica database. You can change the name of the consumer group that Vertica reports its progress to when defining a scheduler or during manual loads of data. Third party tools can query the Kafka brokers to monitor the Vertica cluster's progress when loading data.

For example, you can use Kafka's kafka-consumer-groups.sh script (located in the bin directory of your Kafka installation) to view the status of the Vertica consumer group. The following example demonstrates listing the consumer groups available defined in the Kafka cluster and showing the details of the Vertica consumer group:

$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

vertica-vmart
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
   --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'vertica-vmart' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
web_hits                       0          24500           30000           5500       -                                                 -                              -

From the output, you can see that Vertica reports its consumption of messages back to the vertica-vmart consumer group. This group is the default consumer group when Vertica has the example VMart database loaded. The second command lists the topics being consumed by the vertica-vmart consumer group. You can see that the Vertica cluster has read 24500 of the 30000 messages in the topic's only partition. Later, running the same command will show the Vertica cluster's progress:

$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
    --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'vertica-vmart' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
web_hits                       0          30000           30000           0          -

Changing the consumer group where Vertica reports its progress

You can change the consumer group that Vertica reports its progress to when consuming messages.

Changing for automatic loads with the scheduler

When using a scheduler, you set the consumer group by setting the --consumer-group-id argument to the vkconfig script's scheduler or microbatch utilities. For example, suppose you want the example scheduler shown in Setting up a scheduler to report its consumption to the consumer group name vertica-database. Then you could use the command:

$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update \
    --conf weblog.conf --microbatch weblog --consumer-group-id vertica-database

When the scheduler begins loading data, it will start updating the new consumer group. You can see this on a Kafka node using kafka-consumer-groups.sh.

Use the --list option to return the consumer groups:

$ /path/to/kafka/bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

vertica-database
vertica-vmart

Use the --describe and --group options to return details about a specific consumer group:

$ /path/to/kafka/bin/kafka-consumer-groups.sh --describe --group vertica-database \
                                          --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'vertica-database' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
web_hits                       0          30300           30300           0          -                                                 -                              -

Changing for manual loads

To change the consumer group when manually loading data, use the group_id parameter of KafkaSource function:

=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
                                    brokers='kafka01.example.com:9092',
                                    stop_on_eof=True,
                                    group_id='vertica_database')
                 PARSER KafkaJSONParser();
 Rows Loaded
-------------
       50000
(1 row)

Using consumer group offsets when loading messages

You can choose to have your scheduler, manual load, or custom loading script start loading messages from the consumer group's offset. To load messages from the last offset stored in the consumer group, use the special -3 offset.

Automatic load with the scheduler example

To instruct your scheduler to load messages from the consumer group's saved offset, use the vkconfig script microbatch tool's --offset argument.

  1. Stop the scheduler using the shutdown command and the configuration file that you used to create the scheduler:

    $ /opt/vertica/packages/kafka/bin/vkconfig microbatch shutdown --conf weblog.conf
    
  2. Set the microbatch --offset option to -3:

    $ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update --conf weblog.conf --microbatch weblog --offset -3
    

This sets the offset to -3 for all topic partitions that your scheduler reads from. The scheduler begins the next load with the consumer group's saved offset, and all subsequent loads use the offset saved in stream_microbatch_history.

Manual load example

This example loads messages from the web_hits topic that has one partition consisting of 51,000 messages. For details about manual loads with KafkaSource, see Manually consume data from Kafka.

  1. The first COPY statement creates a consumer group named vertica_manual, and loads the first 50,000 messages from the first partition in the web_hits topic:

    => COPY web_hits
       SOURCE KafkaSource(stream='web_hits|0|0|50000',
                                  brokers='kafka01.example.com:9092',
                                  stop_on_eof=True,
                                  group_id='vertica_manual')
       PARSER KafkaJSONParser()
       REJECTED DATA AS TABLE public.web_hits_rejections;
     Rows Loaded
    -------------
           50000
    (1 row)
    
  2. The next COPY statement passes -3 as the start_offset stream parameter to load from the consumer group's saved offset:

    => COPY web_hits
       SOURCE KafkaSource(stream='web_hits|0|-3',
                                  brokers='kafka01.example.com:9092',
                                  stop_on_eof=True,
                                  group_id='vertica_manual')
       PARSER KafkaJSONParser()
       REJECTED DATA AS TABLE public.web_hits_rejections;
     Rows Loaded
    -------------
            1000
    (1 row)
    

Disabling consumer group reporting

Vertica reports the offsets of the messages it consumes to Kafka by default. If you do not specifically configure a consumer group for Vertica, it still reports its offsets to a consumer group named vertica_database-name (where database-name is the name of the database Vertica is currently running).

If you want to completely disable having Vertica report its consumption back to Kafka, you can set the consumer group to an empty string or NULL. For example:

=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
                                    brokers='kafka01.example.com:9092',
                                    stop_on_eof=True,
                                    group_id=NULL)
                 PARSER KafkaJsonParser();
 Rows Loaded
-------------
       60000
(1 row)