这是本节的多页打印视图。
点击此处打印.
返回本页常规视图.
使用调度程序自动使用来自 Kafka 的数据
Vertica 提供了一个调度程序,可加载来自一个或多个 Kafka 主题的流消息。与手动使用 COPY 相比,自动加载流式传输数据有诸多优势:
-
流数据会自动出现在数据库中。新数据出现在数据库中的频率由调度程序的时间范围持续时间控制。
-
调度程序提供了一个只使用一次的过程。调度程序为您管理偏移量,以便 Kafka 发送的每条消息都会使用一次。
-
可以将备份调度程序配置为提供高可用性。如果主调度程序由于某种原因失败,备用调度程序将自动接管数据加载。
-
调度程序管理用于加载数据的资源。可通过分配给调度程序的资源池设置来控制调度程序对资源的使用情况。在手动加载时,必须考虑在加载时所使用的资源。
使用调度程序也有一些缺点,可能不适合您的需求。您可能会发现调度程序无法提供加载过程所需的灵活性。例如,调度程序无法在加载事务期间执行业务逻辑。如果需要执行这种处理,最好创建自己的加载过程。此过程会定期运行 COPY 语句来加载 Kafka 中的数据。然后,它会在提交事务之前执行所需的业务逻辑处理。
有关作业调度程序要求的信息,请参考 Apache Kafka 集成。
作业调度程序的功能
调度程序负责安排从 Kafka 加载数据。调度程序的基本处理单元是一个时间范围,也就是一段时间。在每一时间范围内,调度程序为要运行的每个活动微批处理分配一段时间。每个微批处理负责从单个来源加载数据。一旦时间范围结束,调度程序将开始下一时间范围。调度程序继续这个过程,直到您停止它为止。
调度程序剖析
每个调度程序都有多组设置,每组设置控制数据加载的一个方面。这些组包括:
-
调度程序本身,用来定义配置架构、时间范围持续时间和资源池。
-
群集,用来定义 Kafka 群集中与调度程序联系来加载数据的数据。每个调度程序可以包含多个群集,从而允许使用单个调度程序来从多个 Kafka 群集加载数据。
-
源,用来定义 Kafka 主题及这些主题中从中读取数据的分区。
-
目标,用来定义 Vertica 中将接收数据的表。这些表可以是传统的 Vertica 数据库表,也可以是 Flex 表。
-
加载规范,用来定义 Vertica 在加载数据时使用的设置。这些设置包括 Vertica 加载数据时需要使用的解析器和筛选器。例如,如果要读取采用 Avro 格式的 Kafka 主题,则加载规范需要指定 Avro 解析器和架构。
-
微批处理,用来 表示来自 Kafka 流的数据负载的各个分段。它们结合了您使用其他 vkconfig 工具创建的群集、源、目标和负载规范的定义。该调度程序使用微批处理中的所有信息执行 COPY 语句,并使用 KafkaSource UDL 函数将数据从 Kafka 传输到 Vertica。每个微批处理负载的统计信息均存储在 stream_microbatch_history 表中。
vkconfig 脚本
可以使用名为 vkconfig 的 Linux 命令行脚本来创建、配置和运行调度程序。此脚本安装在 Vertica 主机及 Vertica 服务器上的以下路径:
/opt/vertica/packages/kafka/bin/vkconfig
注意
可以在非 Vertica 主机上安装和使用 vkconfig 实用程序。在以下情况下,可能需要这样做:
在主机上安装 vkconfig 最简单的方法是安装 Vertica 服务器 RPM。必须使用与数据库群集上安装的 Vertica 版本相匹配的 RPM。安装 RPM 后,请勿创建数据库。vkconfig 实用程序及其关联文件将位于主机上的 /opt/vertica/packages/kafka/bin
目录中。
vkconfig 脚本包含多个工具。Vkconfig 脚本的第一个实参始终为您希望使用的工具。每个工具执行一个功能,例如更改一组设置(如群集或源)或启动和停止调度程序。例如,要创建或配置调度程序,可以使用以下命令:
$ /opt/vertica/packages/kafka/bin/vkconfig scheduler other options...
创建调度程序时会发生什么
在创建新调度程序时,vkconfig 脚本执行以下步骤:
验证调度程序
在创建或配置调度程序时,调度程序会验证以下设置:
-
确认指定群集中的所有代理都存在。
-
连接到指定的主机,并检索 Kafka 群集中所有代理的列表。获取此列表始终确保调度程序拥有所有代理的最新列表。如果主机是已经定义的群集的一部分,则调度程序将取消配置。
-
确认指定的源是否存在。如果该源不再存在,则禁用该源。
-
检索源中的分区数目。如果从源检索到的分区数与调度程序保存的分区值不同,Vertica 将使用从群集中的源检索到的分区数更新该调度程序。
可以使用 vkconfig 脚本的调度程序工具中的 --validation-type
选项禁用验证。有关详细信息,请参阅调度程序工具选项。
同步调度程序
默认情况下,调度程序自动与 Kafka 主机群集同步其配置和源信息。可以使用 --config-refresh
调度程序实用程序选项配置同步间隔。在每个间隔,调度程序都会:
可以使用 vkconfig 脚本的调度程序工具中的 --auto-sync
选项配置同步设置。 调度程序工具选项 了解详细信息。
启动调度程序
可以使用 vkconfig 脚本的启动工具启动调度程序。
启动调度程序时,它会从源收集数据,从指定的偏移量开始。您可以查看 stream_microbatch_history 表,了解调度程序在任何给定时间执行的操作。
要了解如何创建、配置和启动调度程序,请参阅本指南中的设置调度程序。
您也可以选择屏蔽调度程序。例如,您可能希望使用指定的偏移量范围执行一次加载。有关详细信息,请参阅本指南中的手动使用来自 Kafka 的数据。
如果 Vertica 群集下线,则调度程序将尝试重新连接,但失败。当群集重新启动时,必须重新启动调度程序。
管理正在运行的调度程序
从命令行启动调度程序时,它在前台运行。它会一直运行,直到您终止(或主机关闭)。通常,您需要将调度程序作为守护程序进程来启动,该进程会在主机操作系统启动时或在 Vertica 数据库启动后启动调度程序。
使用 vkconfig 脚本的关闭工具关闭正在运行的调度程序。有关详细信息,请参阅关闭工具选项。
可以在调度程序运行时更改它的大部分设置(例如,添加或更改群集、源、目标和微批处理)。调度程序会自动处理配置更新。
启动多个作业调度程序以实现高可用性
要实现高可用性,您可以面向同一个配置架构启动两个或多个相同的调度程序。可以使用启动工具的 --instance-name
选项来区分调度程序(请参阅启动工具选项)。
活动调度程序加载数据并维护 stream_lock 表上的 S 锁。未使用的调度程序将一直处于备用模式,直到活动调度程序失败或被禁用。如果活动调度程序失败,备用调度程序将立即获取 stream_lock 表上的锁,并从失败的调度程序离开的位置接管从 Kafka 的数据加载。
管理自动加载期间被拒绝的消息
Vertica 在自动加载期间使用解析器定义拒绝消息,这在微批处理加载规范中是必需操作。
调度程序将创建一个拒绝表来自动存储每个微批处理的被拒绝消息。要手动指定拒绝表,请在创建微批处理时使用 --rejection-schema
和 --rejection-table
微批处理实用程序选项。查询 stream_microbatches 表可返回微批处理的拒绝架构和拒绝表。
有关 Vertica 如何处理被拒绝数据的其他详细信息,请参阅处理杂乱的数据。
将选项传递给调度程序的 JVM
调度程序使用 Java 虚拟机,通过 JDBC 连接到 Vertica。可以通过名为 VKCONFIG_JVM_OPTS 的 Linux 环境变量将命令行选项传递给 JVM。将调度程序配置为在连接 Vertica 时使用 TLS/SSL 加密时,此选项非常有用。有关详细信息,请参阅为调度程序配置 TLS 连接。
从 MC 中查看调度程序
可以从 MC 中查看 Kafka 作业的状态。有关详细信息,请参考 查看加载历史记录。
1 - 设置调度程序
您可以使用 Linux 命令行设置调度程序。通常,可以在要运行调度程序的主机上执行配置。该主机可以是您的 Vertica 主机之一,也可以是已安装 vkconfig 实用程序的其他主机(有关详细信息,请参阅 vkconfig 脚本)。
请按照以下步骤设置并启动调度程序以将数据从 Kafka 流式传输到 Vertica:
-
创建配置文件(可选)
-
将 Kafka Bin 目录添加到路径中(可选)
-
为调度程序创建资源池
-
创建调度程序
-
创建群集
-
创建数据表
-
创建源
-
创建目标
-
创建加载规范
-
创建微批处理
-
启动调度程序
上述步骤将在以下部分中进行说明。以下各部分将使用将 Web 日志数据(网站点击量)从 Kafka 加载到 Vertica 表的示例。
创建配置文件(可选)
您在创建调度程序时提供给 vkconfig 脚本的许多实参都不会改变。例如,您经常需要将用户名和密码传递给 Vertica 以授权对数据库进行更改。在每次调用 vkconfig 时添加用户名和密码既繁琐且易出错。
您可以改为使用为您指定这些实参的 --conf
选项向 vkconfig 实用程序传递配置文件。这样可避免大量的打字工作,降低挫败感。
配置文件是一个文本文件,其中每行均包含“关键字=值”对。每个关键字都是一个 vkconfig 命令行选项,例如常用 vkconfig 脚本选项中列出的选项。
以下示例显示了名为 weblog.conf 的配置文件,该文件将用于定义名为 weblog_sched 的调度程序。此示例的剩余部分都会使用此配置文件。
# The configuraton options for the weblog_sched scheduler.
username=dbadmin
password=mypassword
dbhost=vertica01.example.com
dbport=5433
config-schema=weblog_sched
将 vkconfig 目录添加到路径中(可选)
vkconfig 脚本位于 /opt/vertica/packages/kafka/bin
目录中。在每次调用 vkconfig 时键入此路径比较繁琐。您可以使用以下命令将 vkconfig 添加到当前 Linux 会话的搜索路径中:
$ export PATH=/opt/vertica/packages/kafka/bin:$PATH
对于会话的剩余部分,您可以调用 vkconfig 而无需指定其整个路径:
$ vkconfig
Invalid tool
Valid options are scheduler, cluster, source, target, load-spec, microbatch, sync, launch,
shutdown, help
如果要使此设置永久有效,请将导出语句添加到 ~/.profile
文件中。此示例的剩余部分假设您已将此目录添加到 shell 的搜索路径中。
为调度程序创建资源池
Vertica 建议您始终专门为每个调度程序创建一个
资源池。调度程序假定自身独占使用分配的资源池。为调度程序使用单独的池,有助于优化它对 Vertica 群集性能的影响。您可以使用 CREATE RESOURCE POOL 语句在 Vertica 中创建资源池。
以下资源池设置在创建调度程序的资源池时发挥着重要作用:
-
PLANNEDCONCURRENCY 决定调度程序同时发送给 Vertica 的微批处理(COPY 语句)的数量。
-
EXECUTIONPARALLELISM 决定每个节点为处理微批处理分区而创建的最大线程数。
-
QUEUETIMEOUT 提供对资源计时的手动控制。将此参数设置为 0 即可允许调度程序管理计时。
有关这些设置以及如何为调度程序微调资源池的详细信息,请参阅管理调度程序资源和性能。
以下 CREATE RESOURCE POOL 语句将创建可加载 1 个微批处理并处理 1 个分区的资源池:
=> CREATE RESOURCE POOL weblog_pool
MEMORYSIZE '10%'
PLANNEDCONCURRENCY 1
EXECUTIONPARALLELISM 1
QUEUETIMEOUT 0;
如果没有为调度程序创建和分配资源池,它将使用 GENERAL 资源池的一部分。Vertica 建议您不要将 GENERAL 池用于生产环境中使用的调度程序。这种回退到使用 GENERAL 池的举措,是为了在测试调度程序配置期间提供方便。当您准备好部署调度程序时,请创建一个您已根据特定需求进行调整的资源池。每次启动使用 GENERAL 池的调度程序时,vkconfig 实用程序都会显示一条警告消息。
如果没有为调度程序分配足够的资源,则有可能会导致错误。例如,如果调度程序无法从预期的数据帧中加载所有主题的数据,则可能会显示错误:OVERSHOT DEADLINE FOR FRAME。
有关资源池的详细信息,请参阅资源池架构。
创建调度程序
Vertica 包含名为 stream_config 的默认调度程序。您可以使用此调度程序,或者使用 vkconfig 脚本的调度程序工具以及 --create
和 --config-schema
选项来创建新的调度程序:
$ vkconfig scheduler --create --config-schema scheduler_name --conf conf_file
添加使用默认选项的调度程序,只需使用 --create
和 --config-schema
选项即可。此命令会在 Vertica 中创建一个包含调度程序配置的新架构。有关创建调度程序架构的详细信息,请参阅创建调度程序时发生的情况。
您也可使用其他配置参数进一步自定义您的调度程序。有关详细信息,请参阅调度程序工具选项。
下面的例子:
-
使用 --config-schema
选项创建名为 weblog_sched 的调度程序。
-
使用 --operator
选项向名为 kafka_user 的 Vertica 用户授予配置和运行调度程序的权限。dbadmin 用户必须单独指定其他权限。
-
使用 --frame-duration
选项指定七分钟的时间范围持续时间。有关选择时间范围持续时间的详细信息,请参阅选择时间范围持续时间。
-
将调度程序使用的资源池设置为之前创建的 weblog_pool:
$ vkconfig scheduler --create --config-schema weblog_sched --operator kafka_user \
--frame-duration '00:07:00' --resource-pool weblog_pool --conf weblog.conf
注意
从技术角度而言,前面的示例不需要提供 --config-schema
实参,因为它是在 weblog.conf 文件中进行设置的。此示例中显示该实参是为了清晰阐明用途。只要值匹配,在命令行和配置文件中提供该实参没有坏处。如果值不匹配,则优先使用在命令行中指定的值。
创建群集
您必须将至少一个 Kafka 群集与调度程序相关联。调度程序可以访问多个 Kafka 群集。要创建群集,您可以提供群集名称以及 Kafka 群集代理的主机名和端口。
创建群集时,调度程序会尝试通过连接到 Kafka 群集来对其进行验证。如果连接成功,调度程序会自动检索群集中所有代理的列表。因此,您不必在 --hosts
参数中列出每个代理。
以下示例会创建一个名为 kafka_weblog 的群集,其中包含两个 Kafka 代理主机:example.com 域中的 kafka01 和 kafka03。Kafka 代理正在端口 9092 上运行。
$ vkconfig cluster --create --cluster kafka_weblog \
--hosts kafka01.example.com:9092,kafka03.example.com:9092 --conf weblog.conf
有关详细信息,请参阅群集工具选项。
创建源
接下来,至少创建一个供调度程序读取的源。源定义了调度程序将从中加载数据的 Kafka 主题以及该主题包含的分区数。
要创建源并将其与已配置的调度程序相关联,请使用 source
工具。创建源时,Vertica 会连接到 Kafka 群集以验证主题是否存在。因此,在创建源之前,请确保该主题已存在于 Kafka 群集中。由于 Vertica 会验证主题是否存在,因此您必须使用 --cluster
选项提供先前定义的群集名称。
以下示例会在上一步中创建的群集上为名为 web_hits 的 Kafka 主题创建源。此主题具有一个分区。
$ vkconfig source --create --cluster kafka_weblog --source web_hits --partitions 1 --conf weblog.conf
注意
--partitions
参数是要加载的分区数,而不是各个分区的列表。例如,如果将此参数设置为 3,则调度程序将从分区 0、1 和 2 加载数据。
有关详细信息,请参阅源工具选项。
创建数据表
在可以为调度程序创建目标之前,您必须在 Vertica 数据库中创建目标表。这是 Vertica 用于存储调度程序从 Kafka 加载的数据的表。您必须决定为目标创建的表类型:
-
使用 CREATE TABLE 语句创建的标准 Vertica 数据库表。此类型的表可高效地存储数据。但是,您必须确保其列与您正在加载的 Kafka 主题中消息的数据格式相匹配。不能将复杂类型的数据加载到标准 Vertica 表中。
-
使用 CREATE FLEXIBLE TABLE 创建的 Flex 表。Flex 表的效率低于标准 Vertica 数据库表。但是,它足够灵活,可以处理架构不断变化的数据。此外,还可以加载标准 Vertica 表无法加载的大多数复杂数据类型(例如地图和列表)。
重要
避免目标表中出现含有主键限制的列。如果调度程序遇到的行中具有违反此限制的值,则会停止加载数据。如果您必须设置主键限制列,请尝试在调度程序加载之前筛选出该列在流数据中的任何冗余值。
此示例中的数据采用的是设定好的格式,因此最好使用标准 Vertica 表。以下示例会创建名为 web_hits 的表来保存四列数据。此表位于公共架构中。
=> CREATE TABLE web_hits (ip VARCHAR(16), url VARCHAR(256), date DATETIME, user_agent VARCHAR(1024));
注意
您无需创建拒绝表来存储被拒绝的消息。调度程序会自动创建拒绝表。
创建目标
创建目标表后,您可以创建调度程序的目标。目标会告知调度程序在何处存储从 Kafka 检索到的数据。创建目标时,此表必须存在。您可以使用带有 --target-schema
和 --target_table
选项的 vkconfig 脚本的目标工具来指定 Vertica 目标表的架构和名称。以下示例会为在上一步中创建的表添加目标。
$ vkconfig target --create --target-schema public --target-table web_hits --conf weblog.conf
有关详细信息,请参阅目标工具选项。
创建加载规范
调度程序的加载规范提供 Vertica 在解析从 Kafka 加载的数据时使用的参数。最重要的选项是 --parser
,可设置 Vertica 用于解析数据的解析器。您可以使用三个解析器选项:
在此示例中,正在从 Kafka 加载的数据采用 JSON 格式。以下命令会创建名为 weblog_load 的加载规范并将解析器设置为 KafkaJSONParser。
$ vkconfig load-spec --create --parser KafkaJSONParser --load-spec weblog_load --conf weblog.conf
有关详细信息,请参阅加载规范工具选项。
创建微批处理
微批处理将合并到目前为止添加到调度程序的所有设置,以定义调度程序用于从 Kafka 加载数据的各个 COPY 语句。
以下示例使用前面示例中创建的所有设置来创建名为 weblog 的微批处理。
$ vkconfig microbatch --create --microbatch weblog --target-schema public --target-table web_hits \
--add-source web_hits --add-source-cluster kafka_weblog --load-spec weblog_load \
--conf weblog.conf
对于可能受益于事务大小减小的微批处理,请考虑在创建微批处理时使用 --max-parallelism
选项。此选项会将具有多个分区的单个微批处理拆分为由较少分区组成的指定数量的同步 COPY 语句。
有关 --max-parallelism
和其他选项的详细信息,请参阅微批处理工具选项。
启动调度程序
创建至少一个微批处理后,就可以运行调度程序。您可以使用启动工具来启动调度程序,并将调度程序架构的名称传递给它。调度程序会开始为在其架构中定义的每个已启用微批处理调度微批处理加载。
以下示例会启动在前面步骤中定义的 weblog 调度程序。该调度程序会使用 nohup 命令来防止调度程序在用户注销时被终止,并重定向 stdout 和 stderr 来防止创建 nohup.out 文件。
$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &
重要
Vertica 不建议通过命令行指定密码。命令行中的密码可以通过系统的进程列表公开,该列表会为每个进程显示命令行。应改为将密码保存在配置文件中。确保配置文件的权限仅允许用户读取配置文件。
有关详细信息,请参阅启动工具选项。
检查调度程序是否正在运行
启动调度程序后,您可以通过查询调度程序架构中的 stream_microbatch_history 表来验证它是否正在运行。此表列出了调度程序已运行的每个微批处理的结果。
例如,此查询列出了微批处理名称、微批处理的开始时间和结束时间、批处理的开始偏移量和结束偏移量,以及批处理结束的原因。结果从调度程序已启动时开始排序:
=> SELECT microbatch, batch_start, batch_end, start_offset,
end_offset, end_reason
FROM weblog_sched.stream_microbatch_history
ORDER BY batch_start DESC LIMIT 10;
microbatch | batch_start | batch_end | start_offset | end_offset | end_reason
------------+----------------------------+----------------------------+--------------+------------+---------------
weblog | 2017-10-04 09:30:19.100752 | 2017-10-04 09:30:20.455739 | -2 | 34931 | END_OF_STREAM
weblog | 2017-10-04 09:30:49.161756 | 2017-10-04 09:30:49.873389 | 34931 | 34955 | END_OF_STREAM
weblog | 2017-10-04 09:31:19.25731 | 2017-10-04 09:31:22.203173 | 34955 | 35274 | END_OF_STREAM
weblog | 2017-10-04 09:31:49.299119 | 2017-10-04 09:31:50.669889 | 35274 | 35555 | END_OF_STREAM
weblog | 2017-10-04 09:32:19.43153 | 2017-10-04 09:32:20.7519 | 35555 | 35852 | END_OF_STREAM
weblog | 2017-10-04 09:32:49.397684 | 2017-10-04 09:32:50.091675 | 35852 | 36142 | END_OF_STREAM
weblog | 2017-10-04 09:33:19.449274 | 2017-10-04 09:33:20.724478 | 36142 | 36444 | END_OF_STREAM
weblog | 2017-10-04 09:33:49.481563 | 2017-10-04 09:33:50.068068 | 36444 | 36734 | END_OF_STREAM
weblog | 2017-10-04 09:34:19.661624 | 2017-10-04 09:34:20.639078 | 36734 | 37036 | END_OF_STREAM
weblog | 2017-10-04 09:34:49.612355 | 2017-10-04 09:34:50.121824 | 37036 | 37327 | END_OF_STREAM
(10 rows)
2 - 选择时间范围持续时间
调度程序的一个关键设置是它的时间范围持续时间。持续时间设置调度程序运行您为其定义的所有微批处理的时间量。此设置对如何从 Apache Kafka 加载数据有显著的影响。
在每一时间范围期间会发生什么
要了解正确的时间范围持续时间,首先需要了解在每一时间范围期间发生了什么。
时间范围持续时间在添加到调度程序的微批处理之间分配。此外,每一时间范围都有一些开销,需要花时间处理微批处理。在每个微批处理中,也有一些开销,这减少了微批处理在从 Kafka 加载数据方面花费时间。下图大致显示了每一时间范围是如何划分的:
正如您所见,时间范围中只有一部分时间用于实际加载流式传输数据。
调度程序如何对微批处理排列优先顺序
首先,调度程序将时间范围中的时间均匀地分给各个微批处理。然后,它依次运行每个微批处理。
在每个微批处理中,专门花大量时间来通过 COPY 语句加载数据。此语句使用 KafkaSource UDL 加载数据。它会一直运行到发生以下两种情况之一:
当调度程序处理时间范围时,它会记下哪些微批处理提前完成。然后,它会安排它们在下一时间范围首先运行。如果以这种方式安排微批处理,则可以让调度程序将时间范围中的更多时间分配给在加载数据方面花费最多时间的微批处理(或许还没有足够的时间到达其数据流的末端)。
例如,请考虑下图。在第一时间范围期间,调度程序在微批处理之间平均分配时间。微批处理 #2 使用分配给它的所有时间(如填充区域所示),而其他微批处理并未这样。在下一时间范围中,调度程序重新排列这些微批处理,使提前完成的微批处理首先进行。它还将更少的时间分配给运行时间较短的微批处理。假定这些微批处理再次提前完成,调度程序便能够将时间范围中的剩余时间分配给微批处理 #2。当调度程序运行时,这种优先级的转移将会继续进行。如果一个主题的流量激增,调度程序会为读取该主题的微批处理分配更多的时间作为补偿。
时间范围持续时间太短会发生什么
如果将调度程序的时间范围持续时间设置得太短,微批处理可能没有足够的时间来加载它们负责读取的数据流中的所有数据。在最坏的情况下,当在每一时间范围期间读取大容量主题时,微批处理可能会落后更多。如果不加以解决,这个问题可能会导致消息永远无法加载,因为在微批处理有机会读取它们之前,它们就已经在数据流中过期了。
在极端情况下,调度程序可能无法在每一时间范围期间中运行每一个微批处理。如果时间范围持续时间太短,以至于大部分时间都花在开销任务(如提交数据和准备运行微批处理)上,便会发生这个问题。每个微批处理为了从 Kafka 加载数据而运行的 COPY 语句的最短持续时间为 1 秒。再加上处理数据加载的开销。一般来说,如果时间范围持续时间短于 2 秒乘以调度程序中的微批处理数,那么一些微批处理可能会没有机会在每一时间范围中运行。
如果调度程序在一个时间范围期间运行每个微批处理超时,那么它在下一时间范围期间会通过优先安排前一时间范围中没有运行的微批处理来做出补偿。这个策略确保每个微批处理都有机会加载数据。但是,这并不能解决问题的根本原因。最好的解决方案是增加时间范围持续时间,为每个微批处理分配足够的时间来在每一时间范围期间加载数据。
时间范围持续时间太长会发生什么
长时间范围持续时间的一个缺点是增加了数据延迟。这个延迟是从 Kafka 发出数据到可在数据库中查询数据之间的时间。较长的时间范围持续时间意味着微批处理的每次执行之间有更多的时间。这意味着更新数据库中的数据之间的间隔时间更长。
这种延迟可能并不重要,具体取决于应用。在确定时间范围持续时间时,请考虑数据可能会延迟到整个时间范围持续时间是否将导致问题。
使用长时间范围持续时间时要考虑的另一个问题是,关闭调度程序所需的时间。只有在当前 COPY 语句完成之后,调度程序才会关闭。根据时间范围持续时间的长度,这个过程可能需要几分钟。
最短时间范围持续时间
为添加到调度程序的每个微批处理至少分配两秒钟。如果时间范围持续时间短于该下限,vkconfig 实用程序将会发出警告。在大多数情况下,您都希望时间范围持续时间更长。如果每个微批处理两秒钟,几乎没有时间来加载数据。
平衡时间范围持续时间要求
要确定部署的最佳时间范围持续时间,请考虑您对数据延迟的敏感程度。如果不是在对来自 Kafka 的数据流式传输执行时间敏感型查询,可以使用默认的 5 分钟甚至更长的时间范围持续时间。如果需要数据延迟更短,那么请考虑从 Kafka 读取的数据量。如果时间范围持续时间较短,那么大容量数据或者流量有明显峰值的数据可能会导致问题。
针对不同需求使用不同的调度程序
假定您要加载的流式传输数据来自少数您希望以低延迟查询的 Kafka 主题,以及其他具有大量数据但您可以承受更长时间延迟的主题。在这种情况下,选择“中间”时间范围持续时间可能无法满足任一需求。更好的解决方案是使用多个调度程序:创建两个调度程序,其中一个调度程序的时间范围持续时间较短,仅读取需要以低延迟查询的主题;另一个调度程序的时间范围持续时间较长,用于从大容量主题加载数据。
例如,假定您要通过 Kafka 将流式传输数据从物联网 (IOT) 传感器网络加载到 Vertica。您可以使用其中的大部分数据定期生成报告并更新仪表板显示。这两个用例对时间都不是特别敏感。但是,正在从中进行加载的三个主题确实包含时间敏感型数据(系统故障、入侵检测和连接中断),这些数据必须立即触发警报。
在这种情况下,可以创建两个调度程序,其中一个调度程序的时间范围持续时间为 5 分钟或更长,用于读取包含非关键数据的大多数主题;第二个调度程序的时间范围持续时间至少为 6 秒(但最好更长),用于仅加载来自三个时间敏感型主题的数据。希望这些主题中的数据量足够小,以至于短时间范围持续时间不会导致问题。
3 - 管理调度程序资源和性能
调度程序的性能受调度程序中的微批处理数、每个微批处理中的分区和 Vertica 群集中的节点影响。使用资源池为调度程序分配一部分系统资源,并微调这些资源,以优化 Vertica 的自动加载。
以下各节详细介绍了调度程序资源池配置和处理场景:
调度程序和资源池
Vertica 建议您始终专门为每个调度程序创建一个
资源池。调度程序假定自身独占使用分配的资源池。为调度程序使用单独的池,有助于优化它对 Vertica 群集性能的影响。您可以使用 CREATE RESOURCE POOL 语句在 Vertica 中创建资源池。
如果没有为调度程序创建和分配资源池,它将使用 GENERAL 资源池的一部分。Vertica 建议您不要将 GENERAL 池用于生产环境中使用的调度程序。这种回退到使用 GENERAL 池的举措,是为了在测试调度程序配置期间提供方便。当您准备好部署调度程序时,请创建一个您已根据特定需求进行调整的资源池。每次启动使用 GENERAL 池的调度程序时,vkconfig 实用程序都会显示一条警告消息。
如果没有为调度程序分配足够的资源,则有可能会导致错误。例如,如果调度程序无法从预期的数据帧中加载所有主题的数据,则可能会显示错误:OVERSHOT DEADLINE FOR FRAME。
有关资源池的详细信息,请参阅资源池架构。
关键资源池设置
微批处理是一个工作单元,它在一个时间范围持续时间内处理单个 Kafka 主题的分区。以下资源池设置在 Vertica 如何加载微批处理和处理分区中发挥着重要作用:
- PLANNEDCONCURRENCY 决定调度程序同时发送给 Vertica 的微批处理(COPY 语句)的数量。在每一时间范围开始时,调度程序都会创建 PLANNEDCONCURRENCY 指定的调度程序线程数。每个调度程序线程连接到 Vertica,每次加载一个微批处理。如果微批处理数大于调度程序线程数,调度程序将额外的微批处理放入队列,并在线程变得可用时加载它们。
- EXECUTIONPARALLELISM 决定每个节点用于处理微批处理分区的最大线程数。当一个微批处理加载到 Vertica 时,它的分区会均匀地分布在群集中的节点之间。在每一时间范围期间,节点最多为每个分区创建一个线程。每个线程一次从一个分区读取数据,直到处理完成或该时间范围结束。如果分区数大于所有节点上的线程数,则在线程可用时处理剩余的分区。
- QUEUETIMEOUT 提供对资源计时的手动控制。将资源池参数 QUEUETIMEOUT 设置为 0 即可允许调度程序管理计时。在所有的微批处理都得到处理后,调度程序将等待该时间范围的剩余时间来处理下一个微批处理。具有适当大小的配置包括为了应对流量高峰而规划的休息时间。有关时间范围持续时间大小所带来影响的信息,请参阅选择时间范围持续时间。
例如,以下 CREATE RESOURCE POOL 语句将创建一个名为 weblogs_pool 的资源池,它同时加载 2 个微批处理。Vertica 群集中的每个节点为每个微批处理创建 10 个线程来处理分区:
=> CREATE RESOURCE POOL weblogs_pool
MEMORYSIZE '10%'
PLANNEDCONCURRENCY 2
EXECUTIONPARALLELISM 10
QUEUETIMEOUT 0;
对于三节点 Vertica 群集,weblogs_pool 为每个节点提供的资源将最多创建 10 个线程来处理分区,或者每个微批处理最多总共 30 个线程。
并发加载多个微批处理
在某些情况下,调度程序中的微批处理可能比可用的 PLANNEDCONCURRENCY 多。下面的几张图说明了当没有足够的调度程序线程同时加载每个微批处理时,调度程序如何将微批处理加载到单个 Vertica 节点中。虽然资源池的 PLANNEDCONCURRENCY (PC) 设置为 2,但是调度程序必须加载三个微批处理:A、B 和 C。为简单起见,将 EXECUTIONPARALLELISM (EP) 设置为 1。
首先,调度程序加载微批处理 A 和微批处理 B,而微批处理 C 等待:
当任何一个微批处理完成加载后,调度程序加载任何剩余的微批处理。在下图中,微批处理 A 已完全加载到 Vertica 中。调度程序继续加载微批处理 B,并使用新的可用调度程序线程加载微批处理 C:
调度程序继续发送数据,直到将所有微批处理加载到 Vertica 中,或该时间范围结束。
试用 PLANNEDCONCURRENCY 来优化性能。请注意,设置得过高可能会在每一时间范围的开始创建太多的连接,从而导致 Vertica 或 Kafka 的可扩展性压力。将 PLANNEDCONCURRENCY 设置得过低则不能充分利用 Vertica 的多处理能力。
Vertica 中的并行处理
资源池设置 EXECUTIONPARALLELISM 限制每个 Vertica 节点为处理分区创建的线程数。下图演示了在 EXECUTIONPARALLELISM 不足以为每个分区创建一个线程时,三节点 Vertica 群集如何处理具有九个分区的主题。这些分区均匀地分布在 Vertica 群集中的节点 1、节点 2 和节点 3 之间。调度程序的资源池将 PLANNEDCONCURRENCY (PC) 设置为 1,将 EXECUTIONPARALLELISM (EP) 设置为 2,因此当调度程序加载微批处理 A 时,每个节点最多创建 2 个线程。每个线程一次从一个分区进行读取。没有为其分配线程的分区必须等待处理:
当线程处理完为其分配的分区后,剩下的分区将在线程可用时分配给线程:
在调度程序的资源池上设置 EXECUTIONPARALLELISM 时,请考虑调度程序中所有微批处理的分区数。
并发加载分区主题
具有多个分区的单个主题可能会从增加并行加载或降低事务大小中受益。使用 --max-parallelism
微批处理,可以动态地将具有多个分区的主题拆分为多个负载均衡的微批处理,每个微批处理均由原始微批处理的分区的子集组成。调度程序使用其资源池中可用的 PLANNEDCONCURRENCY 来同时加载动态拆分的微批处理。
调度程序资源池中的 EXECUTIONPARALLELISM 设置决定每个节点为处理其单个微批处理的部分分区而创建的最大线程数。拆分微批处理使每个节点都能够为同一工作单元创建更多的线程。当有足够的 PLANNEDCONCURRENCY 并且每个节点分配的分区数大于调度程序资源池中的 EXECUTIONPARALLELISM 设置时,使用 --max-parallelism
拆分微批处理并在每个节点上创建更多线程来并行处理更多分区。
下图演示了双节点 Vertica 群集如何使用 PLANNEDCONCURRENCY (PC) 和 EXECUTIONPARALLELISM (EP) 均设置为 2 的资源池来加载和处理微批处理 A。因为调度程序只加载一个微批处理,所以有 1 个调度程序线程未使用。每个节点为每个调度程序线程创建 2 个线程来处理分配给它的分区:
将微批处理 A 的 --max-parallelism
选项设为 2 将使调度程序可以动态地将微批处理 A 拆分为 2 个更小的微批处理 A1 和 A2。因为有 2 个调度程序线程可用,所以子集微批处理会同时加载到 Vertica 中。每个节点为每个调度程序线程创建 2 个线程来处理微批处理 A1 和 A2 的分区:
使用 --max-parallelism
防止由大容量 Kafka 主题组成的微批处理中出现瓶颈。它还为需要额外处理(如文本索引)的微批处理提供更快的加载。
4 - 对 Kafka 调度程序使用连接负载均衡
在 --dbhost
选项或配置文件中的 dbhost 条目中向调度程序提供 Vertica 节点的名称。调度程序连接到此节点以初始化它用于从 Kafka 加载数据而执行的所有语句。例如,每次执行微批处理时,调度程序都会连接到同一节点来运行 COPY 语句。如果使用单个节点作为所有调度程序操作的启动程序节点,这会影响该节点的性能,进而影响整个数据库的性能。
为了避免单个节点成为瓶颈,可以使用连接负载均衡将运行调度程序语句的负载分散到数据库中的多个节点上。连接负载均衡在负载均衡组内的节点之间分配客户端连接。有关此功能的概述,请参阅关于本机连接负载均衡。
为调度程序启用连接负载均衡是一个两步骤过程:
-
为调度程序选择或创建负载均衡策略。
-
在调度程序中启用负载均衡。
为调度程序选择或创建负载均衡策略
连接负载均衡策略将来自特定网络地址集的传入连接重定向到一组节点。如果您的数据库已定义合适的负载均衡策略,那么可以直接使用它,而不是专门为调度程序创建一个。
如果数据库没有合适的策略,请创建一个。让您的策略将来自运行 Kafka 调度程序的主机的 IP 地址的连接重定向到数据库中的一组节点。您选择的节点组将充当调度程序所执行语句的启动程序。
重要
在
Eon 模式数据库中,仅包括属于调度程序负载均衡组中
主子群集的节点。这些节点加载数据的效率最高。
以下示例演示了如何为三节点数据库中的所有三个节点设置负载均衡策略。调度程序在数据库中的节点 1 上运行,因此路由规则的源地址范围 (192.168.110.0/24) 包含数据库中节点的 IP 地址。示例的最后一步验证是否已对来自第一个节点(IP 地址 10.20.110.21)的连接进行负载均衡。
=> SELECT node_name,node_address,node_address_family FROM v_catalog.nodes;
node_name | node_address | node_address_family
------------------+--------------+----------------------
v_vmart_node0001 | 10.20.110.21 | ipv4
v_vmart_node0002 | 10.20.110.22 | ipv4
v_vmart_node0003 | 10.20.110.23 | ipv4
(4 rows)
=> CREATE NETWORK ADDRESS node01 ON v_vmart_node0001 WITH '10.20.110.21';
CREATE NETWORK ADDRESS
=> CREATE NETWORK ADDRESS node02 ON v_vmart_node0002 WITH '10.20.110.22';
CREATE NETWORK ADDRESS
=> CREATE NETWORK ADDRESS node03 on v_vmart_node0003 WITH '10.20.110.23';
CREATE NETWORK ADDRESS
=> CREATE LOAD BALANCE GROUP kafka_scheduler_group WITH ADDRESS node01,node02,node03;
CREATE LOAD BALANCE GROUP
=> CREATE ROUTING RULE kafka_scheduler_rule ROUTE
'10.20.110.0/24' TO kafka_scheduler_group;
CREATE ROUTING RULE
=> SELECT describe_load_balance_decision('10.20.110.21');
describe_load_balance_decision
--------------------------------------------------------------------------------
Describing load balance decision for address [10.20.110.21]
Load balance cache internal version id (node-local): [2]
Considered rule [kafka_scheduler_rule] source ip filter [10.20.110.0/24]...
input address matches this rule
Matched to load balance group [kafka_scheduler_group] the group has policy [ROUNDROBIN]
number of addresses [3]
(0) LB Address: [10.20.110.21]:5433
(1) LB Address: [10.20.110.22]:5433
(2) LB Address: [10.20.110.23]:5433
Chose address at position [1]
Routing table decision: Success. Load balance redirect to: [10.20.110.23] port [5433]
(1 row)
重要
在 Vertica 节点上运行调度程序时请小心,要么将其 dbhost 名称设置为 localhost,要么不指定值(这意味着 dbhost 默认为 localhost)。到 localhost 的连接使用环回 IP 地址 127.0.0.1,而不是节点的主网络地址。如果创建的负载均衡路由规则重定向来自节点的 IP 地址范围的传入连接,那么它将不适用于使用 localhost 建立的连接。最好的解决方案是使用节点的 IP 地址或 FQDN 作为 dbhost 设置。
如果调度程序从一个没有应用路由规则的 IP 地址连接到 Vertica,将在 vertica.log 中看到类似如下的消息:
[Session] <INFO> Load balance request from client address ::1 had decision:
Classic load balancing considered, but either the policy was NONE or no target was
available. Details: [NONE or invalid]
<LOG> @v_vmart_node0001: 00000/5789: Connection load balance request
refused by server
在调度程序中启用负载均衡
客户端必须选择加入负载均衡,Vertica 才能将连接负载均衡策略应用到连接。例如,必须将 -C
标记传递给 vsql 命令,才能对您的交互式会话实现负载均衡。
调度程序使用 Java JDBC 库连接到 Vertica。要让调度程序选择加入负载均衡,必须将 JDBC 库的 ConnectionLoadBalance 选项设置为 1。有关详细信息,请参阅JDBC 中的负载均衡。
使用 vkconfig 脚本的 --jdbc-opt
选项,或者将 jdbc-opt 选项添加到配置文件中以设置 ConnectionLoadBalance 选项。例如,要使用名为 weblog.conf 的配置文件从命令行启动调度程序,请使用以下命令:
$ nohup vkconfig launch --conf weblog.conf --jdbc-opt ConnectionLoadBalance=1 >/dev/null 2>&1 &
要永久启用负载均衡,可以将负载均衡选项添加到配置文件中。以下示例显示了设置调度程序内配置为使用连接负载均衡的示例中的 weblog.conf 文件。
username=dbadmin
password=mypassword
dbhost=10.20.110.21
dbport=5433
config-schema=weblog_sched
jdbc-opt=ConnectionLoadBalance=1
可通过查询 SESSIONS 表来检查是否正在对调度程序的连接进行负载均衡:
=> SELECT node_name, user_name, client_label FROM V_MONITOR.SESSIONS;
node_name | user_name | client_label
------------------+-----------+-------------------------------------------
v_vmart_node0001 | dbadmin | vkstream_weblog_sched_leader_persistent_4
v_vmart_node0001 | dbadmin |
v_vmart_node0002 | dbadmin | vkstream_weblog_sched_lane-worker-0_0_8
v_vmart_node0003 | dbadmin | vkstream_weblog_sched_VDBLogger_0_9
(4 rows)
在 client_labels 列中,调度程序的连接具有以 vkstream 开头的标签(没有客户端标签的行是交互式会话)。可以看到调度程序打开的三个连接都进入了不同的节点。
5 - 使用偏移量限制加载
Kafka 会维护用户可配置的消息积压。默认情况下,新创建的调度程序读取 Kafka 主题中的所有消息,包括积压的所有消息,而不仅仅是调度程序启动后流出的消息。通常,您想要的就是这样。
然而,在某些情况下,您可能只想将源的一部分流传输到表中。例如,假定您想要分析您的电子商务站点在特定日期和时间的 Web 流量。然而,您的 Kafka 主题包含比您想要分析的时间更早的 Web 访问记录。在这种情况下,可以使用偏移量,将需要的数据流传输到 Vertica 进行分析。
另一个常见的用例是当您已经从 Kafka 手动加载了一些数据时(请参阅手动使用来自 Kafka 的数据)。现在,您希望流传输新到达的所有数据。默认情况下,调度程序会重新加载之前加载的所有数据(假定仍然可以从 Kafka 获得)。此时,可以使用偏移量来告诉调度程序从手动数据加载停止的位置开始自动加载数据。
配置调度程序从某个偏移量开始流传输
vkconfig 脚本的微批处理工具有一个 --offset
选项,使用它可以指定希望调度程序开始加载的源中消息的索引。此选项接受逗号分隔的索引值列表。除非使用 --partition
选项,否则必须为源中的每个分区提供一个索引值。此选项可用于选择应用偏移量的分区。在微批处理中设置偏移量时,调度程序不能运行。
如果微批处理定义了多个群集,请使用 --cluster
选项选择偏移量选项适用于哪个群集。类似地,如果微批处理有多个源,则必须使用 --source
选项选择一个。
例如,假定您只想从一个名为 web_hits 的源加载最后 1000 条消息。为了方便起见,假定源只包含单个分区,而微批处理只定义了单个群集和单个主题。
第一个任务是确定流结尾的当前偏移量。可以在一个 Kafka 节点上通过调用 GetOffsetShell 类执行此操作,将时间参数设置为 -1(主题的结尾):
$ path to kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list kafka01:9092,kafka03:9092 --time -1 \
--topic web_hits
{metadata.broker.list=kafka01:9092,kafka03:9092, request.timeout.ms=1000,
client.id=GetOffsetShell, security.protocol=PLAINTEXT}
web_hits:0:8932
还可以使用 GetOffsetShell 查找流中出现在时间戳之前的偏移量。
在上面的示例中,web_hits 主题的单个分区的结尾偏移量为 8932。如果我们希望加载来自源的最后 1000 条消息,我们需要将微批处理的偏移量设置为 8932 - 1001 或 7931。
注意
偏移量的开始包含在 Vertica COPY 语句中。Kafka 的本机起始偏移量是独占的。因此,必须在偏移量上添加 1 才能获得正确的消息数。
计算偏移量后,即可在微批处理配置中设置了。下面的例子:
$ vkconfig shutdown --conf weblog.conf
$ vkconfig microbatch --microbatch weblog --update --conf weblog.conf --offset 7931
$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &
如果目标表为空,或者在调度程序启动之前被截断,那么表中将有 1000 行(直到通过源流传输更多的消息):
=> select count(*) from web_hits;
count
-------
1000
(1 row)
注意
上一个示例假定 Kafka 主题中最后 1000 条消息的偏移量值是连续分配的。这种假定并不总是正确。由于各种原因,Kafka 主题在其偏移量编号中可能存在空白。偏移量指的是 Kafka 分配给消息的键值,而不是它在主题中的位置。
6 - 在 Vertica 升级后更新调度程序
调度程序仅与创建它的 Vertica 版本兼容。在 Vertica 版本之间,调度程序的配置架构或调度程序调用的 UDx 函数可能会发生更改。在升级 Vertica 后,必须更新调度程序才能收到这些更改。
当您将 Vertica 升级到新的主要版本或服务包时,使用 vkconfig 调度程序工具的 --upgrade
选项来更新调度程序。如果不更新调度程序,则在尝试启动它时将收到一条错误消息。例如:
$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &
com.vertica.solutions.kafka.exception.FatalException: Configured scheduler schema and current
scheduler configuration schema version do not match. Upgrade configuration by running:
vkconfig scheduler --upgrade
at com.vertica.solutions.kafka.scheduler.StreamCoordinator.assertVersion(StreamCoordinator.java:64)
at com.vertica.solutions.kafka.scheduler.StreamCoordinator.run(StreamCoordinator.java:125)
at com.vertica.solutions.kafka.Launcher.run(Launcher.java:205)
at com.vertica.solutions.kafka.Launcher.main(Launcher.java:258)
Scheduler instance failed. Check log file. Check log file.
$ vkconfig scheduler --upgrade --conf weblog.conf
Checking if UPGRADE necessary...
UPGRADE required, running UPGRADE...
UPGRADE completed successfully, now the scheduler configuration schema version is v8.1.1
$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &
. . .