Setting up a scheduler
You set up a scheduler using the Linux command line. Usually you perform the configuration on the host where you want your scheduler to run. It can be one of your Vertica hosts, or a separate host where you have installed the vkconfig utility (see The vkconfig Script for more information).
Follow these steps to set up and start a scheduler to stream data from Kafka to Vertica:
These steps are explained in the following sections. These sections will use the example of loading web log data (hits on a web site) from Kafka into a Vertica table.
Create a config file (optional)
Many of the arguments you supply to the vkconfig script while creating a scheduler do not change. For example, you often need to pass a username and password to Vertica to authorize the changes to be made in the database. Adding the username and password to each call to vkconfig is tedious and error-prone.
Instead, you can pass the vkconfig utility a configuration file using the --conf
option that specifies these arguments for you. It can save you a lot of typing and frustration.
The config file is a text file with a keyword=value pair on each line. Each keyword is a vkconfig command-line option, such as the ones listed in Common vkconfig script options.
The following example shows a config file named weblog.conf that will be used to define a scheduler named weblog_sched. This config file is used throughout the rest of this example.
# The configuraton options for the weblog_sched scheduler.
username=dbadmin
password=mypassword
dbhost=vertica01.example.com
dbport=5433
config-schema=weblog_sched
Add the vkconfig directory to your path (optional)
The vkconfig script is located in the /opt/vertica/packages/kafka/bin
directory. Typing this path for each call to vkconfig is tedious. You can add vkconfig to your search path for your current Linux session using the following command:
$ export PATH=/opt/vertica/packages/kafka/bin:$PATH
For the rest of your session, you are able to call vkconfig without specifying its entire path:
$ vkconfig
Invalid tool
Valid options are scheduler, cluster, source, target, load-spec, microbatch, sync, launch,
shutdown, help
If you want to make this setting permanent, add the export statement to your ~/.profile
file. The rest of this example assumes that you have added this directory to your shell's search path.
Create a resource pool for your scheduler
Vertica recommends you always create a resource pool specifically for each scheduler. Schedulers assume they have exclusive use of the resource pool they are assigned. Using a separate pool for a scheduler lets you fine-tune its impact on your Vertica cluster's performance. You create resource pools within Vertica using the CREATE RESOURCE POOL statement.
The following resource pool settings play an important role when creating your scheduler's resource pool:
-
PLANNEDCONCURRENCY determines the number of microbatches (COPY statements) that the scheduler sends to Vertica simultaneously.
-
EXECUTIONPARALLELISM determines the maximum number of threads that each node creates to process a microbatch's partitions.
-
QUEUETIMEOUT provides manual control over resource timings. Set this to 0 to allow the scheduler to manage timings.
See Managing scheduler resources and performance for detailed information about these settings and how to fine-tune a resource pool for your scheduler.
The following CREATE RESOURCE POOL statement creates a resource pool that loads 1 microbatch and processes 1 partition:
=> CREATE RESOURCE POOL weblog_pool
MEMORYSIZE '10%'
PLANNEDCONCURRENCY 1
EXECUTIONPARALLELISM 1
QUEUETIMEOUT 0;
If you do not create and assign a resource pool for your scheduler, it uses a portion of the GENERAL resource pool. Vertica suggests you do not use the GENERAL pool for schedulers used in production environments. This fallback to using the GENERAL pool is intended as a convenience during testing your scheduler configuration. When you are ready to deploy your scheduler, create a resource pool that you have tuned to its specific needs. Each time you start a scheduler that is using the GENERAL pool, the vkconfig utility will display a warning message.
Not allocating enough resources to your schedulers can result in errors. For example, you may get OVERSHOT DEADLINE FOR FRAME errors if the scheduler is not able to load data from all of the topics it is supposed to in a data frame.
See Resource pool architecture for more information about resource pools.
Create the scheduler
Vertica includes a default scheduler named stream_config. You can use this scheduler or create a new scheduler using the vkconfig script's scheduler tool with the --create
and --config-schema
options:
$ vkconfig scheduler --create --config-schema scheduler_name --conf conf_file
The --create
and --config-schema
options are the only ones required to add a scheduler with default options. This command creates a new schema in Vertica that holds the scheduler's configuration. See What Happens When You Create a Scheduler for details on the creation of the scheduler's schema.
You can use additional configuration parameters to further customize your scheduler. See Scheduler tool options for more information.
The following example:
-
Creates a scheduler named weblog_sched using the --
config-schema
option. -
Grants privileges to configure and run the scheduler to the Vertica user named kafka_user with the
--operator
option. The dbadmin user must specify additional privileges separately. -
Specifies a frame duration of seven minutes with the
--frame-duration
option. For more information about picking a frame duration, see Choosing a frame duration. -
Sets the resource pool that the scheduler uses to the weblog_pool created earlier:
$ vkconfig scheduler --create --config-schema weblog_sched --operator kafka_user \
--frame-duration '00:07:00' --resource-pool weblog_pool --conf weblog.conf
Note
Technically, the previous example doesn't need to supply the--config-schema
argument because it is set in the weblog.conf file. It appears in this example for clarity. There's no harm in supplying it on the command line as well as in the configuration file, as long as the values match. If they do not match, the value given on the command line takes priority.
Create a cluster
You must associate at least one Kafka cluster with your scheduler. Schedulers can access more than one Kafka cluster. To create a cluster, you supply a name for the cluster and host names and ports the Kafka cluster's brokers.
When you create a cluster, the scheduler attempts to validate it by connecting to the Kafka cluster. If it successfully connects, the scheduler automatically retrieves the list of all brokers in the cluster. Therefore, you do not have to list every single broker in the --hosts
parameter.
The following example creates a cluster named kafka_weblog, with two Kafka broker hosts: kafka01 and kafka03 in the example.com domain. The Kafka brokers are running on port 9092.
$ vkconfig cluster --create --cluster kafka_weblog \
--hosts kafka01.example.com:9092,kafka03.example.com:9092 --conf weblog.conf
See Cluster tool options for more information.
Create a source
Next, create at least one source for your scheduler to read. The source defines the Kafka topic the scheduler loads data from as well as the number of partitions the topic contains.
To create and associate a source with a configured scheduler, use the source
tool. When you create a source, Vertica connects to the Kafka cluster to verify that the topic exists. So, before you create the source, make sure that the topic already exists in your Kafka cluster. Because Vertica verifies the existence of the topic, you must supply the previously-defined cluster name using the --cluster
option.
The following example creates a source for the Kafka topic named web_hits on the cluster created in the previous step. This topic has a single partition.
$ vkconfig source --create --cluster kafka_weblog --source web_hits --partitions 1 --conf weblog.conf
Note
The--partitions
parameter is the number of partitions to load, not a list of individual partitions. For example, if you set this parameter to 3, the scheduler will load data from partitions 0, 1, and 2.
See Source tool options for more information.
Create a data table
Before you can create a target for your scheduler, you must create a target table in your Vertica database. This is the table Vertica uses to store the data the scheduler loads from Kafka. You must decide which type of table to create for your target:
-
A standard Vertica database table, which you create using the CREATE TABLE statement. This type of table stores data efficiently. However, you must ensure that its columns match the data format of the messages in Kafka topic you are loading. You cannot load complex types of data into a standard Vertica table.
-
A flex table, which you create using CREATE FLEXIBLE TABLE. A flex table is less efficient than a standard Vertica database table. However, it is flexible enough to deal with data whose schema varies and changes. It also can load most complex data types that (such as maps and lists) that standard Vertica tables cannot.
Important
Avoid having columns with primary key restrictions in your target table. The scheduler stops loading data if it encounters a row that has a value which violates this restriction. If you must have a primary key restricted column, try to filter out any redundant values for that column in the streamed data before is it loaded by the scheduler.The data in this example is in a set format, so the best table to use is a standard Vertica table. The following example creates a table named web_hits to hold four columns of data. This table is located in the public schema.
=> CREATE TABLE web_hits (ip VARCHAR(16), url VARCHAR(256), date DATETIME, user_agent VARCHAR(1024));
Note
You do not need to create a rejection table to store rejected messages. The scheduler creates the rejection table automatically.Create a target
Once you have created your target table, you can create your scheduler's target. The target tells your scheduler where to store the data it retrieves from Kafka. This table must exist when you create your target. You use the vkconfig script's target tool with the --target-schema
and --target_table
options to specify the Vertica target table's schema and name. The following example adds a target for the table created in the previous step.
$ vkconfig target --create --target-schema public --target-table web_hits --conf weblog.conf
See Target tool options for more information.
Create a load spec
The scheduler's load spec provides parameters that Vertica uses when parsing the data loaded from Kafka. The most important option is --parser
which sets the parser that Vertica uses to parse the data. You have three parser options:
-
KafkaAvroParser for data in Avro format.
-
KafkaJSONParser for data in JSON format.
-
KafkaParser to load each message into a single VARCHAR field. See Parsing custom formats for more information.
In this example, the data being loaded from Kafka is in JSON format. The following command creates a load spec named weblog_load and sets the parser to KafkaJSONParser.
$ vkconfig load-spec --create --parser KafkaJSONParser --load-spec weblog_load --conf weblog.conf
See Load spec tool options for more information.
Create a microbatch
The microbatch combines all of the settings added to the scheduler so far to define the individual COPY statements that the scheduler uses to load data from Kafka.
The following example uses all of the settings created in the previous examples to create a microbatch called weblog.
$ vkconfig microbatch --create --microbatch weblog --target-schema public --target-table web_hits \
--add-source web_hits --add-source-cluster kafka_weblog --load-spec weblog_load \
--conf weblog.conf
For microbatches that might benefit from a reduced transaction size, consider using the --max-parallelism
option when creating the microbatch. This option splits a single microbatch with multiple partitions into the specified number of simultaneous COPY statements consisting of fewer partitions.
See Microbatch tool options for more information about --max-parallelism
and other options.
Launch the scheduler
Once you've created at least one microbatch, you can run your scheduler. You start your scheduler using the launch tool, passing it the name of the scheduler's schema. The scheduler begins scheduling microbatch loads for every enabled microbatch defined in its schema.
The following example launches the weblog scheduler defined in the previous steps. It uses the nohup command to prevent the scheduler being killed when the user logs out, and redirects stdout and stderr to prevent a nohup.out file from being created.
$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &
Important
Vertica does not recommend specifying a password on the command line. Passwords on the command line can be exposed by the system's list of processes, which shows the command line for each process. Instead, put the password in a configuration file. Make sure the configuration file's permissions only allow it to be read by the user.See Launch tool options for more information.
Checking that the scheduler is running
Once you have launched your scheduler, you can verify that it is running by querying the stream_microbatch_history table in the scheduler's schema. This table lists the results of each microbatch the scheduler has run.
For example, this query lists the microbatch name, the start and end times of the microbatch, the start and end offset of the batch, and why the batch ended. The results are ordered to start from when the scheduler was launched:
=> SELECT microbatch, batch_start, batch_end, start_offset,
end_offset, end_reason
FROM weblog_sched.stream_microbatch_history
ORDER BY batch_start DESC LIMIT 10;
microbatch | batch_start | batch_end | start_offset | end_offset | end_reason
------------+----------------------------+----------------------------+--------------+------------+---------------
weblog | 2017-10-04 09:30:19.100752 | 2017-10-04 09:30:20.455739 | -2 | 34931 | END_OF_STREAM
weblog | 2017-10-04 09:30:49.161756 | 2017-10-04 09:30:49.873389 | 34931 | 34955 | END_OF_STREAM
weblog | 2017-10-04 09:31:19.25731 | 2017-10-04 09:31:22.203173 | 34955 | 35274 | END_OF_STREAM
weblog | 2017-10-04 09:31:49.299119 | 2017-10-04 09:31:50.669889 | 35274 | 35555 | END_OF_STREAM
weblog | 2017-10-04 09:32:19.43153 | 2017-10-04 09:32:20.7519 | 35555 | 35852 | END_OF_STREAM
weblog | 2017-10-04 09:32:49.397684 | 2017-10-04 09:32:50.091675 | 35852 | 36142 | END_OF_STREAM
weblog | 2017-10-04 09:33:19.449274 | 2017-10-04 09:33:20.724478 | 36142 | 36444 | END_OF_STREAM
weblog | 2017-10-04 09:33:49.481563 | 2017-10-04 09:33:50.068068 | 36444 | 36734 | END_OF_STREAM
weblog | 2017-10-04 09:34:19.661624 | 2017-10-04 09:34:20.639078 | 36734 | 37036 | END_OF_STREAM
weblog | 2017-10-04 09:34:49.612355 | 2017-10-04 09:34:50.121824 | 37036 | 37327 | END_OF_STREAM
(10 rows)