设置调度程序
您可以使用 Linux 命令行设置调度程序。通常,可以在要运行调度程序的主机上执行配置。该主机可以是您的 Vertica 主机之一,也可以是已安装 vkconfig 实用程序的其他主机(有关详细信息,请参阅 vkconfig 脚本)。
请按照以下步骤设置并启动调度程序以将数据从 Kafka 流式传输到 Vertica:
上述步骤将在以下部分中进行说明。以下各部分将使用将 Web 日志数据(网站点击量)从 Kafka 加载到 Vertica 表的示例。
创建配置文件(可选)
您在创建调度程序时提供给 vkconfig 脚本的许多实参都不会改变。例如,您经常需要将用户名和密码传递给 Vertica 以授权对数据库进行更改。在每次调用 vkconfig 时添加用户名和密码既繁琐且易出错。
您可以改为使用为您指定这些实参的 --conf
选项向 vkconfig 实用程序传递配置文件。这样可避免大量的打字工作,降低挫败感。
配置文件是一个文本文件,其中每行均包含“关键字=值”对。每个关键字都是一个 vkconfig 命令行选项,例如常用 vkconfig 脚本选项中列出的选项。
以下示例显示了名为 weblog.conf 的配置文件,该文件将用于定义名为 weblog_sched 的调度程序。此示例的剩余部分都会使用此配置文件。
# The configuraton options for the weblog_sched scheduler.
username=dbadmin
password=mypassword
dbhost=vertica01.example.com
dbport=5433
config-schema=weblog_sched
将 vkconfig 目录添加到路径中(可选)
vkconfig 脚本位于 /opt/vertica/packages/kafka/bin
目录中。在每次调用 vkconfig 时键入此路径比较繁琐。您可以使用以下命令将 vkconfig 添加到当前 Linux 会话的搜索路径中:
$ export PATH=/opt/vertica/packages/kafka/bin:$PATH
对于会话的剩余部分,您可以调用 vkconfig 而无需指定其整个路径:
$ vkconfig
Invalid tool
Valid options are scheduler, cluster, source, target, load-spec, microbatch, sync, launch,
shutdown, help
如果要使此设置永久有效,请将导出语句添加到 ~/.profile
文件中。此示例的剩余部分假设您已将此目录添加到 shell 的搜索路径中。
为调度程序创建资源池
Vertica 建议您始终专门为每个调度程序创建一个 资源池。调度程序假定自身独占使用分配的资源池。为调度程序使用单独的池,有助于优化它对 Vertica 群集性能的影响。您可以使用 CREATE RESOURCE POOL 语句在 Vertica 中创建资源池。
以下资源池设置在创建调度程序的资源池时发挥着重要作用:
-
PLANNEDCONCURRENCY 决定调度程序同时发送给 Vertica 的微批处理(COPY 语句)的数量。
-
EXECUTIONPARALLELISM 决定每个节点为处理微批处理分区而创建的最大线程数。
-
QUEUETIMEOUT 提供对资源计时的手动控制。将此参数设置为 0 即可允许调度程序管理计时。
有关这些设置以及如何为调度程序微调资源池的详细信息,请参阅管理调度程序资源和性能。
以下 CREATE RESOURCE POOL 语句将创建可加载 1 个微批处理并处理 1 个分区的资源池:
=> CREATE RESOURCE POOL weblog_pool
MEMORYSIZE '10%'
PLANNEDCONCURRENCY 1
EXECUTIONPARALLELISM 1
QUEUETIMEOUT 0;
如果没有为调度程序创建和分配资源池,它将使用 GENERAL 资源池的一部分。Vertica 建议您不要将 GENERAL 池用于生产环境中使用的调度程序。这种回退到使用 GENERAL 池的举措,是为了在测试调度程序配置期间提供方便。当您准备好部署调度程序时,请创建一个您已根据特定需求进行调整的资源池。每次启动使用 GENERAL 池的调度程序时,vkconfig 实用程序都会显示一条警告消息。
如果没有为调度程序分配足够的资源,则有可能会导致错误。例如,如果调度程序无法从预期的数据帧中加载所有主题的数据,则可能会显示错误:OVERSHOT DEADLINE FOR FRAME。
有关资源池的详细信息,请参阅资源池架构。
创建调度程序
Vertica 包含名为 stream_config 的默认调度程序。您可以使用此调度程序,或者使用 vkconfig 脚本的调度程序工具以及 --create
和 --config-schema
选项来创建新的调度程序:
$ vkconfig scheduler --create --config-schema scheduler_name --conf conf_file
添加使用默认选项的调度程序,只需使用 --create
和 --config-schema
选项即可。此命令会在 Vertica 中创建一个包含调度程序配置的新架构。有关创建调度程序架构的详细信息,请参阅创建调度程序时发生的情况。
您也可使用其他配置参数进一步自定义您的调度程序。有关详细信息,请参阅调度程序工具选项。
下面的例子:
-
使用 --
config-schema
选项创建名为 weblog_sched 的调度程序。 -
使用
--operator
选项向名为 kafka_user 的 Vertica 用户授予配置和运行调度程序的权限。dbadmin 用户必须单独指定其他权限。 -
使用
--frame-duration
选项指定七分钟的时间范围持续时间。有关选择时间范围持续时间的详细信息,请参阅选择时间范围持续时间。 -
将调度程序使用的资源池设置为之前创建的 weblog_pool:
$ vkconfig scheduler --create --config-schema weblog_sched --operator kafka_user \
--frame-duration '00:07:00' --resource-pool weblog_pool --conf weblog.conf
注意
从技术角度而言,前面的示例不需要提供--config-schema
实参,因为它是在 weblog.conf 文件中进行设置的。此示例中显示该实参是为了清晰阐明用途。只要值匹配,在命令行和配置文件中提供该实参没有坏处。如果值不匹配,则优先使用在命令行中指定的值。
创建群集
您必须将至少一个 Kafka 群集与调度程序相关联。调度程序可以访问多个 Kafka 群集。要创建群集,您可以提供群集名称以及 Kafka 群集代理的主机名和端口。
创建群集时,调度程序会尝试通过连接到 Kafka 群集来对其进行验证。如果连接成功,调度程序会自动检索群集中所有代理的列表。因此,您不必在 --hosts
参数中列出每个代理。
以下示例会创建一个名为 kafka_weblog 的群集,其中包含两个 Kafka 代理主机:example.com 域中的 kafka01 和 kafka03。Kafka 代理正在端口 9092 上运行。
$ vkconfig cluster --create --cluster kafka_weblog \
--hosts kafka01.example.com:9092,kafka03.example.com:9092 --conf weblog.conf
有关详细信息,请参阅群集工具选项。
创建源
接下来,至少创建一个供调度程序读取的源。源定义了调度程序将从中加载数据的 Kafka 主题以及该主题包含的分区数。
要创建源并将其与已配置的调度程序相关联,请使用 source
工具。创建源时,Vertica 会连接到 Kafka 群集以验证主题是否存在。因此,在创建源之前,请确保该主题已存在于 Kafka 群集中。由于 Vertica 会验证主题是否存在,因此您必须使用 --cluster
选项提供先前定义的群集名称。
以下示例会在上一步中创建的群集上为名为 web_hits 的 Kafka 主题创建源。此主题具有一个分区。
$ vkconfig source --create --cluster kafka_weblog --source web_hits --partitions 1 --conf weblog.conf
注意
--partitions
参数是要加载的分区数,而不是各个分区的列表。例如,如果将此参数设置为 3,则调度程序将从分区 0、1 和 2 加载数据。
有关详细信息,请参阅源工具选项。
创建数据表
在可以为调度程序创建目标之前,您必须在 Vertica 数据库中创建目标表。这是 Vertica 用于存储调度程序从 Kafka 加载的数据的表。您必须决定为目标创建的表类型:
-
使用 CREATE TABLE 语句创建的标准 Vertica 数据库表。此类型的表可高效地存储数据。但是,您必须确保其列与您正在加载的 Kafka 主题中消息的数据格式相匹配。不能将复杂类型的数据加载到标准 Vertica 表中。
-
使用 CREATE FLEXIBLE TABLE 创建的 Flex 表。Flex 表的效率低于标准 Vertica 数据库表。但是,它足够灵活,可以处理架构不断变化的数据。此外,还可以加载标准 Vertica 表无法加载的大多数复杂数据类型(例如地图和列表)。
重要
避免目标表中出现含有主键限制的列。如果调度程序遇到的行中具有违反此限制的值,则会停止加载数据。如果您必须设置主键限制列,请尝试在调度程序加载之前筛选出该列在流数据中的任何冗余值。此示例中的数据采用的是设定好的格式,因此最好使用标准 Vertica 表。以下示例会创建名为 web_hits 的表来保存四列数据。此表位于公共架构中。
=> CREATE TABLE web_hits (ip VARCHAR(16), url VARCHAR(256), date DATETIME, user_agent VARCHAR(1024));
注意
您无需创建拒绝表来存储被拒绝的消息。调度程序会自动创建拒绝表。创建目标
创建目标表后,您可以创建调度程序的目标。目标会告知调度程序在何处存储从 Kafka 检索到的数据。创建目标时,此表必须存在。您可以使用带有 --target-schema
和 --target_table
选项的 vkconfig 脚本的目标工具来指定 Vertica 目标表的架构和名称。以下示例会为在上一步中创建的表添加目标。
$ vkconfig target --create --target-schema public --target-table web_hits --conf weblog.conf
有关详细信息,请参阅目标工具选项。
创建加载规范
调度程序的加载规范提供 Vertica 在解析从 Kafka 加载的数据时使用的参数。最重要的选项是 --parser
,可设置 Vertica 用于解析数据的解析器。您可以使用三个解析器选项:
-
KafkaAvroParser(适用于采用 Avro 格式的数据)。
-
KafkaJSONParser(适用于采用 JSON 格式的数据)。
-
KafkaJSONParser(用于将每条消息加载到同一个 VARCHAR 字段中)。有关详细信息,请参阅解析自定义格式。
在此示例中,正在从 Kafka 加载的数据采用 JSON 格式。以下命令会创建名为 weblog_load 的加载规范并将解析器设置为 KafkaJSONParser。
$ vkconfig load-spec --create --parser KafkaJSONParser --load-spec weblog_load --conf weblog.conf
有关详细信息,请参阅加载规范工具选项。
创建微批处理
微批处理将合并到目前为止添加到调度程序的所有设置,以定义调度程序用于从 Kafka 加载数据的各个 COPY 语句。
以下示例使用前面示例中创建的所有设置来创建名为 weblog 的微批处理。
$ vkconfig microbatch --create --microbatch weblog --target-schema public --target-table web_hits \
--add-source web_hits --add-source-cluster kafka_weblog --load-spec weblog_load \
--conf weblog.conf
对于可能受益于事务大小减小的微批处理,请考虑在创建微批处理时使用 --max-parallelism
选项。此选项会将具有多个分区的单个微批处理拆分为由较少分区组成的指定数量的同步 COPY 语句。
有关 --max-parallelism
和其他选项的详细信息,请参阅微批处理工具选项。
启动调度程序
创建至少一个微批处理后,就可以运行调度程序。您可以使用启动工具来启动调度程序,并将调度程序架构的名称传递给它。调度程序会开始为在其架构中定义的每个已启用微批处理调度微批处理加载。
以下示例会启动在前面步骤中定义的 weblog 调度程序。该调度程序会使用 nohup 命令来防止调度程序在用户注销时被终止,并重定向 stdout 和 stderr 来防止创建 nohup.out 文件。
$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &
重要
Vertica 不建议通过命令行指定密码。命令行中的密码可以通过系统的进程列表公开,该列表会为每个进程显示命令行。应改为将密码保存在配置文件中。确保配置文件的权限仅允许用户读取配置文件。有关详细信息,请参阅启动工具选项。
检查调度程序是否正在运行
启动调度程序后,您可以通过查询调度程序架构中的 stream_microbatch_history 表来验证它是否正在运行。此表列出了调度程序已运行的每个微批处理的结果。
例如,此查询列出了微批处理名称、微批处理的开始时间和结束时间、批处理的开始偏移量和结束偏移量,以及批处理结束的原因。结果从调度程序已启动时开始排序:
=> SELECT microbatch, batch_start, batch_end, start_offset,
end_offset, end_reason
FROM weblog_sched.stream_microbatch_history
ORDER BY batch_start DESC LIMIT 10;
microbatch | batch_start | batch_end | start_offset | end_offset | end_reason
------------+----------------------------+----------------------------+--------------+------------+---------------
weblog | 2017-10-04 09:30:19.100752 | 2017-10-04 09:30:20.455739 | -2 | 34931 | END_OF_STREAM
weblog | 2017-10-04 09:30:49.161756 | 2017-10-04 09:30:49.873389 | 34931 | 34955 | END_OF_STREAM
weblog | 2017-10-04 09:31:19.25731 | 2017-10-04 09:31:22.203173 | 34955 | 35274 | END_OF_STREAM
weblog | 2017-10-04 09:31:49.299119 | 2017-10-04 09:31:50.669889 | 35274 | 35555 | END_OF_STREAM
weblog | 2017-10-04 09:32:19.43153 | 2017-10-04 09:32:20.7519 | 35555 | 35852 | END_OF_STREAM
weblog | 2017-10-04 09:32:49.397684 | 2017-10-04 09:32:50.091675 | 35852 | 36142 | END_OF_STREAM
weblog | 2017-10-04 09:33:19.449274 | 2017-10-04 09:33:20.724478 | 36142 | 36444 | END_OF_STREAM
weblog | 2017-10-04 09:33:49.481563 | 2017-10-04 09:33:50.068068 | 36444 | 36734 | END_OF_STREAM
weblog | 2017-10-04 09:34:19.661624 | 2017-10-04 09:34:20.639078 | 36734 | 37036 | END_OF_STREAM
weblog | 2017-10-04 09:34:49.612355 | 2017-10-04 09:34:50.121824 | 37036 | 37327 | END_OF_STREAM
(10 rows)