使用调度程序自动使用来自 Kafka 的数据

Vertica 提供了一个调度程序,可加载来自一个或多个 Kafka 主题的流消息。与手动使用 COPY 相比,自动加载流式传输数据有诸多优势:

  • 流数据会自动出现在数据库中。新数据出现在数据库中的频率由调度程序的时间范围持续时间控制。

  • 调度程序提供了一个只使用一次的过程。调度程序为您管理偏移量,以便 Kafka 发送的每条消息都会使用一次。

  • 可以将备份调度程序配置为提供高可用性。如果主调度程序由于某种原因失败,备用调度程序将自动接管数据加载。

  • 调度程序管理用于加载数据的资源。可通过分配给调度程序的资源池设置来控制调度程序对资源的使用情况。在手动加载时,必须考虑在加载时所使用的资源。

使用调度程序也有一些缺点,可能不适合您的需求。您可能会发现调度程序无法提供加载过程所需的灵活性。例如,调度程序无法在加载事务期间执行业务逻辑。如果需要执行这种处理,最好创建自己的加载过程。此过程会定期运行 COPY 语句来加载 Kafka 中的数据。然后,它会在提交事务之前执行所需的业务逻辑处理。

有关作业调度程序要求的信息,请参考 Apache Kafka 集成

作业调度程序的功能

调度程序负责安排从 Kafka 加载数据。调度程序的基本处理单元是一个时间范围,也就是一段时间。在每一时间范围内,调度程序为要运行的每个活动微批处理分配一段时间。每个微批处理负责从单个来源加载数据。一旦时间范围结束,调度程序将开始下一时间范围。调度程序继续这个过程,直到您停止它为止。

调度程序剖析

每个调度程序都有多组设置,每组设置控制数据加载的一个方面。这些组包括:

  • 调度程序本身,用来定义配置架构、时间范围持续时间和资源池。

  • 群集,用来定义 Kafka 群集中与调度程序联系来加载数据的数据。每个调度程序可以包含多个群集,从而允许使用单个调度程序来从多个 Kafka 群集加载数据。

  • 源,用来定义 Kafka 主题及这些主题中从中读取数据的分区。

  • 目标,用来定义 Vertica 中将接收数据的表。这些表可以是传统的 Vertica 数据库表,也可以是 Flex 表。

  • 加载规范,用来定义 Vertica 在加载数据时使用的设置。这些设置包括 Vertica 加载数据时需要使用的解析器和筛选器。例如,如果要读取采用 Avro 格式的 Kafka 主题,则加载规范需要指定 Avro 解析器和架构。

  • 微批处理,用来 表示来自 Kafka 流的数据负载的各个分段。它们结合了您使用其他 vkconfig 工具创建的群集、源、目标和负载规范的定义。该调度程序使用微批处理中的所有信息执行 COPY 语句,并使用 KafkaSource UDL 函数将数据从 Kafka 传输到 Vertica。每个微批处理负载的统计信息均存储在 stream_microbatch_history 表中。

vkconfig 脚本

可以使用名为 vkconfig 的 Linux 命令行脚本来创建、配置和运行调度程序。此脚本安装在 Vertica 主机及 Vertica 服务器上的以下路径:

/opt/vertica/packages/kafka/bin/vkconfig

vkconfig 脚本包含多个工具。Vkconfig 脚本的第一个实参始终为您希望使用的工具。每个工具执行一个功能,例如更改一组设置(如群集或源)或启动和停止调度程序。例如,要创建或配置调度程序,可以使用以下命令:

$ /opt/vertica/packages/kafka/bin/vkconfig scheduler other options...

创建调度程序时会发生什么

在创建新调度程序时,vkconfig 脚本执行以下步骤:

  • 使用为调度程序指定的名称创建一个新的 Vertica 架构。可以在配置期间使用此名称标识调度程序。

  • 在新创建的架构中创建为了管理 Kafka 数据加载而所需的表。有关详细信息,请参阅数据流式传输架构表

验证调度程序

在创建或配置调度程序时,调度程序会验证以下设置:

  • 确认指定群集中的所有代理都存在。

  • 连接到指定的主机,并检索 Kafka 群集中所有代理的列表。获取此列表始终确保调度程序拥有所有代理的最新列表。如果主机是已经定义的群集的一部分,则调度程序将取消配置。

  • 确认指定的源是否存在。如果该源不再存在,则禁用该源。

  • 检索源中的分区数目。如果从源检索到的分区数与调度程序保存的分区值不同,Vertica 将使用从群集中的源检索到的分区数更新该调度程序。

可以使用 vkconfig 脚本的调度程序工具中的 --validation-type 选项禁用验证。有关详细信息,请参阅调度程序工具选项

同步调度程序

默认情况下,调度程序自动与 Kafka 主机群集同步其配置和源信息。可以使用 --config-refresh 调度程序实用程序选项配置同步间隔。在每个间隔,调度程序都会:

  • 通过在其 Vertica 配置架构中查询其设置,检查调度程序的配置有无更新。

  • 执行验证调度程序中列出的所有检查。

可以使用 vkconfig 脚本的调度程序工具中的 --auto-sync 选项配置同步设置。 调度程序工具选项 了解详细信息。

启动调度程序

可以使用 vkconfig 脚本的启动工具启动调度程序。

启动调度程序时,它会从源收集数据,从指定的偏移量开始。您可以查看 stream_microbatch_history 表,了解调度程序在任何给定时间执行的操作。

要了解如何创建、配置和启动调度程序,请参阅本指南中的设置调度程序

您也可以选择屏蔽调度程序。例如,您可能希望使用指定的偏移量范围执行一次加载。有关详细信息,请参阅本指南中的手动使用来自 Kafka 的数据

如果 Vertica 群集下线,则调度程序将尝试重新连接,但失败。当群集重新启动时,必须重新启动调度程序。

管理正在运行的调度程序

从命令行启动调度程序时,它在前台运行。它会一直运行,直到您终止(或主机关闭)。通常,您需要将调度程序作为守护程序进程来启动,该进程会在主机操作系统启动时或在 Vertica 数据库启动后启动调度程序。

使用 vkconfig 脚本的关闭工具关闭正在运行的调度程序。有关详细信息,请参阅关闭工具选项

可以在调度程序运行时更改它的大部分设置(例如,添加或更改群集、源、目标和微批处理)。调度程序会自动处理配置更新。

启动多个作业调度程序以实现高可用性

要实现高可用性,您可以面向同一个配置架构启动两个或多个相同的调度程序。可以使用启动工具的 --instance-name 选项来区分调度程序(请参阅启动工具选项)。

活动调度程序加载数据并维护 stream_lock 表上的 S 锁。未使用的调度程序将一直处于备用模式,直到活动调度程序失败或被禁用。如果活动调度程序失败,备用调度程序将立即获取 stream_lock 表上的锁,并从失败的调度程序离开的位置接管从 Kafka 的数据加载。

管理自动加载期间被拒绝的消息

Vertica 在自动加载期间使用解析器定义拒绝消息,这在微批处理加载规范中是必需操作。

调度程序将创建一个拒绝表来自动存储每个微批处理的被拒绝消息。要手动指定拒绝表,请在创建微批处理时使用 --rejection-schema--rejection-table微批处理实用程序选项。查询 stream_microbatches 表可返回微批处理的拒绝架构和拒绝表。

有关 Vertica 如何处理被拒绝数据的其他详细信息,请参阅处理杂乱的数据

将选项传递给调度程序的 JVM

调度程序使用 Java 虚拟机,通过 JDBC 连接到 Vertica。可以通过名为 VKCONFIG_JVM_OPTS 的 Linux 环境变量将命令行选项传递给 JVM。将调度程序配置为在连接 Vertica 时使用 TLS/SSL 加密时,此选项非常有用。有关详细信息,请参阅为调度程序配置 TLS 连接

从 MC 中查看调度程序

可以从 MC 中查看 Kafka 作业的状态。有关详细信息,请参考 查看加载历史记录