Setting up a scheduler

You set up a scheduler using the Linux command line.

You set up a scheduler using the Linux command line. Usually you perform the configuration on the host where you want your scheduler to run. It can be one of your Vertica hosts, or a separate host where you have installed the vkconfig utility (see The vkconfig Script for more information).

Follow these steps to set up and start a scheduler to stream data from Kafka to Vertica:

  1. Create a Config File (Optional)

  2. Add the Kafka Bin Directory to Your Path (Optional)

  3. Create a Resource Pool for Your Scheduler

  4. Create the Scheduler

  5. Create a Cluster

  6. Create a Data Table

  7. Create a Source

  8. Create a Target

  9. Create a Load-Spec

  10. Create a Microbatch

  11. Launch the Scheduler

These steps are explained in the following sections. These sections will use the example of loading web log data (hits on a web site) from Kafka into a Vertica table.

Create a config file (optional)

Many of the arguments you supply to the vkconfig script while creating a scheduler do not change. For example, you often need to pass a username and password to Vertica to authorize the changes to be made in the database. Adding the username and password to each call to vkconfig is tedious and error-prone.

Instead, you can pass the vkconfig utility a configuration file using the --conf option that specifies these arguments for you. It can save you a lot of typing and frustration.

The config file is a text file with a keyword=value pair on each line. Each keyword is a vkconfig command-line option, such as the ones listed in Common vkconfig script options.

The following example shows a config file named weblog.conf that will be used to define a scheduler named weblog_sched. This config file is used throughout the rest of this example.


# The configuraton options for the weblog_sched scheduler.
username=dbadmin
password=mypassword
dbhost=vertica01.example.com
dbport=5433
config-schema=weblog_sched

Add the vkconfig directory to your path (optional)

The vkconfig script is located in the /opt/vertica/packages/kafka/bin directory. Typing this path for each call to vkconfig is tedious. You can add vkconfig to your search path for your current Linux session using the following command:

$ export PATH=/opt/vertica/packages/kafka/bin:$PATH

For the rest of your session, you are able to call vkconfig without specifying its entire path:

$ vkconfig
Invalid tool
Valid options are scheduler, cluster, source, target, load-spec, microbatch, sync, launch,
shutdown, help

If you want to make this setting permanent, add the export statement to your ~/.profile file. The rest of this example assumes that you have added this directory to your shell's search path.

Create a resource pool for your scheduler

Vertica recommends you always create a resource pool specifically for each scheduler. Schedulers assume they have exclusive use of the resource pool they are assigned. Using a separate pool for a scheduler lets you fine-tune its impact on your Vertica cluster's performance. You create resource pools within Vertica using the CREATE RESOURCE POOL statement.

The following resource pool settings play an important role when creating your scheduler's resource pool:

  • PLANNEDCONCURRENCY determines the number of microbatches (COPY statements) that the scheduler sends to Vertica simultaneously.

  • EXECUTIONPARALLELISM determines the maximum number of threads that each node creates to process a microbatch's partitions.

  • QUEUETIMEOUT provides manual control over resource timings. Set this to 0 to allow the scheduler to manage timings.

See Managing scheduler resources and performance for detailed information about these settings and how to fine-tune a resource pool for your scheduler.

The following CREATE RESOURCE POOL statement creates a resource pool that loads 1 microbatch and processes 1 partition:

=> CREATE RESOURCE POOL weblog_pool
    MEMORYSIZE '10%'
    PLANNEDCONCURRENCY 1
    EXECUTIONPARALLELISM 1
    QUEUETIMEOUT 0;

If you do not create and assign a resource pool for your scheduler, it uses a portion of the GENERAL resource pool. Vertica suggests you do not use the GENERAL pool for schedulers used in production environments. This fallback to using the GENERAL pool is intended as a convenience during testing your scheduler configuration. When you are ready to deploy your scheduler, create a resource pool that you have tuned to its specific needs. Each time you start a scheduler that is using the GENERAL pool, the vkconfig utility will display a warning message.

Not allocating enough resources to your schedulers can result in errors. For example, you may get OVERSHOT DEADLINE FOR FRAME errors if the scheduler is not able to load data from all of the topics it is supposed to in a data frame.

See Resource pool architecture for more information about resource pools.

Create the scheduler

Vertica includes a default scheduler named stream_config. You can use this scheduler or create a new scheduler using the vkconfig script's scheduler tool with the --create and --config-schema options:

$ vkconfig scheduler --create --config-schema scheduler_name --conf conf_file

The --create and --config-schema options are the only ones required to add a scheduler with default options. This command creates a new schema in Vertica that holds the scheduler's configuration. See What Happens When You Create a Scheduler for details on the creation of the scheduler's schema.

You can use additional configuration parameters to further customize your scheduler. See Scheduler tool options for more information.

The following example:

  • Creates a scheduler named weblog_sched using the --config-schema option.

  • Grants privileges to configure and run the scheduler to the Vertica user named kafka_user with the --operator option. The dbadmin user must specify additional privileges separately.

  • Specifies a frame duration of seven minutes with the --frame-duration option. For more information about picking a frame duration, see Choosing a frame duration.

  • Sets the resource pool that the scheduler uses to the weblog_pool created earlier:

$ vkconfig scheduler --create --config-schema weblog_sched --operator kafka_user \
  --frame-duration '00:07:00' --resource-pool weblog_pool --conf weblog.conf

Create a cluster

You must associate at least one Kafka cluster with your scheduler. Schedulers can access more than one Kafka cluster. To create a cluster, you supply a name for the cluster and host names and ports the Kafka cluster's brokers.

When you create a cluster, the scheduler attempts to validate it by connecting to the Kafka cluster. If it successfully connects, the scheduler automatically retrieves the list of all brokers in the cluster. Therefore, you do not have to list every single broker in the --hosts parameter.

The following example creates a cluster named kafka_weblog, with two Kafka broker hosts: kafka01 and kafka03 in the example.com domain. The Kafka brokers are running on port 9092.

$ vkconfig cluster --create --cluster kafka_weblog \
  --hosts kafka01.example.com:9092,kafka03.example.com:9092 --conf weblog.conf

See Cluster tool options for more information.

Create a source

Next, create at least one source for your scheduler to read. The source defines the Kafka topic the scheduler loads data from as well as the number of partitions the topic contains.

To create and associate a source with a configured scheduler, use the source tool. When you create a source, Vertica connects to the Kafka cluster to verify that the topic exists. So, before you create the source, make sure that the topic already exists in your Kafka cluster. Because Vertica verifies the existence of the topic, you must supply the previously-defined cluster name using the --cluster option.

The following example creates a source for the Kafka topic named web_hits on the cluster created in the previous step. This topic has a single partition.

$ vkconfig source --create --cluster kafka_weblog --source web_hits --partitions 1 --conf weblog.conf

See Source tool options for more information.

Create a data table

Before you can create a target for your scheduler, you must create a target table in your Vertica database. This is the table Vertica uses to store the data the scheduler loads from Kafka. You must decide which type of table to create for your target:

  • A standard Vertica database table, which you create using the CREATE TABLE statement. This type of table stores data efficiently. However, you must ensure that its columns match the data format of the messages in Kafka topic you are loading. You cannot load complex types of data into a standard Vertica table.

  • A flex table, which you create using CREATE FLEXIBLE TABLE. A flex table is less efficient than a standard Vertica database table. However, it is flexible enough to deal with data whose schema varies and changes. It also can load most complex data types that (such as maps and lists) that standard Vertica tables cannot.

The data in this example is in a set format, so the best table to use is a standard Vertica table. The following example creates a table named web_hits to hold four columns of data. This table is located in the public schema.

=> CREATE TABLE web_hits (ip VARCHAR(16), url VARCHAR(256), date DATETIME, user_agent VARCHAR(1024));

Create a target

Once you have created your target table, you can create your scheduler's target. The target tells your scheduler where to store the data it retrieves from Kafka. This table must exist when you create your target. You use the vkconfig script's target tool with the --target-schema and --target_table options to specify the Vertica target table's schema and name. The following example adds a target for the table created in the previous step.

$ vkconfig target --create --target-schema public --target-table web_hits --conf weblog.conf

See Target tool options for more information.

Create a load spec

The scheduler's load spec provides parameters that Vertica uses when parsing the data loaded from Kafka. The most important option is --parser which sets the parser that Vertica uses to parse the data. You have three parser options:

In this example, the data being loaded from Kafka is in JSON format. The following command creates a load spec named weblog_load and sets the parser to KafkaJSONParser.

$ vkconfig load-spec --create --parser KafkaJSONParser --load-spec weblog_load --conf weblog.conf

See Load spec tool options for more information.

Create a microbatch

The microbatch combines all of the settings added to the scheduler so far to define the individual COPY statements that the scheduler uses to load data from Kafka.

The following example uses all of the settings created in the previous examples to create a microbatch called weblog.

$ vkconfig microbatch --create --microbatch weblog --target-schema public --target-table web_hits \
           --add-source web_hits --add-source-cluster kafka_weblog --load-spec weblog_load \
           --conf weblog.conf

For microbatches that might benefit from a reduced transaction size, consider using the --max-parallelism option when creating the microbatch. This option splits a single microbatch with multiple partitions into the specified number of simultaneous COPY statements consisting of fewer partitions.

See Microbatch tool options for more information about --max-parallelism and other options.

Launch the scheduler

Once you've created at least one microbatch, you can run your scheduler. You start your scheduler using the launch tool, passing it the name of the scheduler's schema. The scheduler begins scheduling microbatch loads for every enabled microbatch defined in its schema.

The following example launches the weblog scheduler defined in the previous steps. It uses the nohup command to prevent the scheduler being killed when the user logs out, and redirects stdout and stderr to prevent a nohup.out file from being created.

$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &

See Launch tool options for more information.

Checking that the scheduler is running

Once you have launched your scheduler, you can verify that it is running by querying the stream_microbatch_history table in the scheduler's schema. This table lists the results of each microbatch the scheduler has run.

For example, this query lists the microbatch name, the start and end times of the microbatch, the start and end offset of the batch, and why the batch ended. The results are ordered to start from when the scheduler was launched:

=> SELECT microbatch, batch_start, batch_end, start_offset,
          end_offset, end_reason
          FROM weblog_sched.stream_microbatch_history
          ORDER BY batch_start DESC LIMIT 10;

 microbatch |        batch_start         |         batch_end          | start_offset | end_offset |  end_reason
------------+----------------------------+----------------------------+--------------+------------+---------------
 weblog     | 2017-10-04 09:30:19.100752 | 2017-10-04 09:30:20.455739 |           -2 |      34931 | END_OF_STREAM
 weblog     | 2017-10-04 09:30:49.161756 | 2017-10-04 09:30:49.873389 |        34931 |      34955 | END_OF_STREAM
 weblog     | 2017-10-04 09:31:19.25731  | 2017-10-04 09:31:22.203173 |        34955 |      35274 | END_OF_STREAM
 weblog     | 2017-10-04 09:31:49.299119 | 2017-10-04 09:31:50.669889 |        35274 |      35555 | END_OF_STREAM
 weblog     | 2017-10-04 09:32:19.43153  | 2017-10-04 09:32:20.7519   |        35555 |      35852 | END_OF_STREAM
 weblog     | 2017-10-04 09:32:49.397684 | 2017-10-04 09:32:50.091675 |        35852 |      36142 | END_OF_STREAM
 weblog     | 2017-10-04 09:33:19.449274 | 2017-10-04 09:33:20.724478 |        36142 |      36444 | END_OF_STREAM
 weblog     | 2017-10-04 09:33:49.481563 | 2017-10-04 09:33:50.068068 |        36444 |      36734 | END_OF_STREAM
 weblog     | 2017-10-04 09:34:19.661624 | 2017-10-04 09:34:20.639078 |        36734 |      37036 | END_OF_STREAM
 weblog     | 2017-10-04 09:34:49.612355 | 2017-10-04 09:34:50.121824 |        37036 |      37327 | END_OF_STREAM
(10 rows)