Containerized Kafka Scheduler

The Vertica Apache Kafka integration includes a scheduler, a mechanism that you can configure to automatically consume data from Kafka and load that data into a Vertica database. The Vertica Kafka Scheduler is the containerized version of that scheduler that runs natively on Kubernetes. Both schedulers have identical functionality and accept the same configuration parameters.

This document provides quickstart instructions about how to create, configure, and launch the Vertica Kafka Scheduler on Kubernetes. It includes minimal details about each command. For in-depth documentation about scheduler behavior and advanced configuration, see Automatically consume data from Kafka with a scheduler.

Prerequisites

Add the Helm charts

To simplify deployment, Vertica packages the Kafka Scheduler in a Helm chart. Add the charts to your local helm repository:

$ helm repo add vertica-charts https://vertica.github.io/charts
$ helm repo update

Launch a scheduler

When you launch a scheduler, you must update the scheduler configuration, create the scheduler, set up a Vertica database to consume data from the scheduler, and then launch the scheduler.

The Vertica Kafka scheduler has two modes:

  • initializer: Configuration mode. Starts a container so that you can exec into it and configure it.
  • launcher: Launch mode. Launches the scheduler. Starts a container that calls vkconfig launch automatically. Run this mode after you configure the container in initializer mode.

Use the initializer mode to configure all the scheduler settings. After you configure the scheduler, you upgrade the helm chart to launch it in launcher mode.

Install the scheduler

Install the scheduler Helm chart to start the scheduler in initializer mode. The following helm install command deploys a scheduler named vkscheduler in the kafka namespace:

$ helm install vkscheduler --namespace kafka vertica-charts/vertica-kafka-scheduler
NAME: vkscheduler
LAST DEPLOYED: Tue Apr  2 11:53:49 2024
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Vertica's Kafka Scheduler has been deployed.

The initializer pod is running. You can exec into it and run your vkconfig
commands with this command:

kubectl exec -n kafka -it vkscheduler-vertica-kafka-scheduler-initializer -- bash

The command output provides the kubectl exec command that you can use to access a shell in the initializer pod and configure the scheduler.

Verify that the scheduler's initializer pod is running:

$ kubectl get pods --namespace kafka
NAME                                              READY   STATUS    RESTARTS      AGE
...
vkscheduler-vertica-kafka-scheduler-initializer   1/1     Running   1 (12s ago)   77s

Create the target table

The target table is the Vertica database table that stores the data that the scheduler loads from Kafka. In this example, you create a flex table so that you can load data with an unknown or varying schema:

  1. Create a flex table to store the data:
    => CREATE FLEX TABLE KafkaFlex();
    CREATE TABLE
    
  2. Create a user for the flex table:
    => CREATE USER KafkaUser;
    CREATE USER
    
  3. Create a resource pool for the scheduler. Vertica recommends that each scheduler have exclusive use of its own resource pool so that you can fine-tune the scheduler's impact on your Vertica cluster's performance:
    => CREATE RESOURCE POOL scheduler_pool PLANNEDCONCURRENCY 1;
    CREATE RESOURCE POOL
    
    For additional details, see Managing scheduler resources and performance.

Override scheduler configuration

After you install the scheduler, you need to configure it for your environment. The scheduler configuration file is vkconfig.conf, and it is stored in the following location in the initializer pod:

/opt/vertica/packages/kafka/config/vkconfig.conf

By default, vkconfig.conf contains the following values:

config-schema=Scheduler
dbport=5433
enable-ssl=false
username=dbadmin

vkconfig.conf is read-only from within the filesystem, so you must upgrade the Helm chart to override the default settings. The following YAML-formatted file provides a template for scheduler overrides:

image:
  repository: opentext/kafka-scheduler
  pullPolicy: IfNotPresent
  tag: scheduler-version
launcherEnabled: false
replicaCount: 1
initializerEnabled: true
conf:
  generate: true
  content:
    config-schema: scheduler-name
    username: dbadmin
    dbport: "5433"
    enable-ssl: "false"
    dbhost: vertica-db-host-ip
tls:
  enabled: false
serviceAccount:
  create: true

This template requires that you update the following values:

  • image.tag: Scheduler version. The scheduler version must match the version of the Vertica database that you used to create the target table.
  • conf.content.config-schema: Scheduler name. When you launch the scheduler, the Vertica database creates a schema that you can track with data streaming tables.
  • conf.content.dbhost: IP address for a host in your Vertica cluster.

For example, the scheduler-overrides.yaml file contains the following values:

image:
  repository: opentext/kafka-scheduler
  pullPolicy: IfNotPresent
  tag: 24.2.0
launcherEnabled: false
replicaCount: 1
initializerEnabled: true
conf:
  generate: true
  content:
    config-schema: scheduler-sample
    username: dbadmin
    dbport: "5433"
    enable-ssl: "false"
    dbhost: 10.20.30.40
tls:
  enabled: false
serviceAccount:
  create: true

After you define your overrides, use helm upgrade to apply the overrides to the scheduler initializer pod:

$ helm upgrade vkscheduler --namespace kafka vertica-charts/vertica-kafka-scheduler -f scheduler-overrides.yaml
Release "vkscheduler" has been upgraded. Happy Helming!
NAME: vkscheduler
LAST DEPLOYED: Tue Apr  2 11:54:35 2024
NAMESPACE: kafka
STATUS: deployed
REVISION: 2
TEST SUITE: None
NOTES:
Vertica's Kafka Scheduler has been deployed.

The initializer pod is running. You can exec into it and run your vkconfig
commands with this command:

kubectl exec -n kafka -it vkscheduler-vertica-kafka-scheduler-initializer -- bash

Configure the scheduler

After you update vkconfig.conf, you need to configure the scheduler itself. A scheduler is a combination of multiple components that you must configure individually with the vkconfig command.

To configure the scheduler, you must access the scheduler initializer pod to execute the vkconfig commands:

  1. Access a bash shell in the scheduler initializer pod:

    $ kubectl exec -n kafka -it vkscheduler-vertica-kafka-scheduler-initializer -- bash
    
  2. Define the scheduler. This command identifies the Vertica user, resource pool, and settings such as frame duration:

    bash-5.1$ vkconfig scheduler --conf /opt/vertica/packages/kafka/config/vkconfig.conf \
         --frame-duration 00:00:10 \
         --create --operator KafkaUser \
         --eof-timeout-ms 2000 \
         --config-refresh 00:01:00 \
         --new-source-policy START \
         --resource-pool scheduler_pool
    
  3. Define the target, which is the Vertica database table that the loads data from the scheduler:

    bash-5.1$ vkconfig target --add --conf /opt/vertica/packages/kafka/config/vkconfig.conf \
         --target-schema public \
         --target-table KafkaFlex
    
  4. Define the load spec. This defines how Vertica parses the data from Kafka:

    bash-5.1$ vkconfig load-spec --add --conf /opt/vertica/packages/kafka/config/vkconfig.conf \
         --load-spec KafkaSpec \
         --parser kafkajsonparser \
         --load-method DIRECT \
         --message-max-bytes 1000000
    
  5. Define the cluster. This identifies your Kafka cluster:

    bash-5.1$ vkconfig cluster --add --conf /opt/vertica/packages/kafka/config/vkconfig.conf \
         --cluster KafkaCluster \
         --hosts kafka01.example.com:9092,kafka03.example.com:9092
    
  6. Define the source. The source is the Kafka topic and partitions that you want to load data from:

    bash-5.1$ vkconfig source --add --conf /opt/vertica/packages/kafka/config/vkconfig.conf \
         --cluster KafkaCluster \
         --source KafkaTopic1 \
         --partitions 1
    
  7. Define the microbatch. The microbatch combines the components you created in the previous steps:

    bash-5.1$ vkconfig microbatch --add --conf /opt/vertica/packages/kafka/config/vkconfig.conf \
         --microbatch KafkaBatch1 \
         --add-source KafkaTopic1 \
         --add-source-cluster KafkaCluster \
         --target-schema public \
         --target-table KafkaFlex \
         --rejection-schema public \
         --rejection-table KafkaFlex_rej \
         --load-spec KafkaSpec
    

After you configure the scheduler, exit the pod:

bash-5.1$ exit
exit
$

Launch the scheduler

After you configure the scheduler, you must launch it. To launch the scheduler, upgrade the Helm chart to change the launcherEnabled field to true:

$ helm upgrade --namespace kafka vkscheduler vertica-charts/vertica-kafka-scheduler \
    --set "launcherEnabled=true"

A new pod starts that runs the scheduler in launch mode:

$ kubectl get pods
NAME                                              READY   STATUS      RESTARTS      AGE
tester-vertica-kafka-scheduler-66d5c49dbf-nc86k   1/1     Running     0             14s
tester-vertica-kafka-scheduler-initializer        1/1     Running     0             85m

Test your deployment

Now that you have a containerized Kafka cluster and VerticaDB CR running, you can test that the scheduler is automatically sending data from the Kafka producer to Vertica:

  1. Open a shell that is running your Kafka producer and send sample JSON data:

    >{"a": 1}
    >{"a": 1000}
    
  2. Open a terminal with access to your Vertica cluster and vsql. Query the KafkaFlex table to confirm that it contains the sample JSON data that you sent through the Kafka producer:

    => SELECT compute_flextable_keys_and_build_view('KafkaFlex');
                                     compute_flextable_keys_and_build_view                    
    --------------------------------------------------------------------------------------------------------
     Please see public.KafkaFlex_keys for updated keys
    The view public.KafkaFlex_view is ready for querying
    (1 row)
    
    => SELECT a from KafkaFlex_view;
     a
    -----
     1
     1000
    (2 rows)
    

Clean up

To delete the scheduler, you must use the vkconfig command with the scheduler tool --drop option and the scheduler schema. You must access a shell within the scheduler pod to run the commands:

$ kubectl exec -n kafka -it vkscheduler-vertica-kafka-scheduler-initializer -- bash
bash-5.1$ /opt/vertica/packages/kafka/bin/vkconfig scheduler --drop --config-schema scheduler_sample

You can delete your Kubeneretes resources with the helm uninstall command:

$ helm uninstall vkscheduler -n kafka