这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

使用来自 Kafka 的数据

Kafka 使用者订阅一个或多个由 Kafka 群集管理的主题。每个主题都是一个数据流,一个表示为有序消息序列的无界数据集。Vertica 可以手动或自动使用 Kafka 主题对您的流式传输数据进行分析。

手动使用数据

通过调用 KafkaSource 函数和解析器的 COPY 语句手动使用来自 Kafka 的数据。当您希望执行以下操作时,手动加载会非常有帮助:

  • 使用 Kafka 中当前存在的消息填充表一次。

  • 分析一组特定的消息。可以选择从 Kafka 流中加载的数据子集。

  • 在把调度程序设置为连续将数据流式传输到 Vertica 之前,探索 Kafka 流中的数据。

  • 以调度程序无法实现的方式控制数据加载。例如,在从 Kafka 加载数据期间无法执行业务逻辑或自定义拒绝处理,因为调度程序在其事务期间不支持其他处理。相反,可以定期运行一个事务来执行用来从 Kafka 加载数据的 COPY 语句,然后执行其他处理。

有关详细示例,请参阅 手动使用来自 Kafka 的数据

自动使用数据

通过调度程序(一个在数据到达时加载数据的命令行工具)自动使用从 Kafka 到 Vertica 的流式传输数据。调度程序以微批处理(一个工作单元,在指定的持续时间内处理单个 Kafka 主题的分区)定义的分段加载数据。可以使用 vkconfig 工具管理调度程序配置和选项。

有关详细信息,请参阅使用调度程序自动使用来自 Kafka 的数据

监控使用情况

必须监控消息使用情况以确保 Kafka 和 Vertica 有效地通信。可以使用本机 Kafka 工具监控使用者组,或者可以使用 vkconfig 工具查看详细使用信息。

有关其他信息,请参阅监控消息使用情况

使用 Kafka 筛选器解析数据

您的数据流可能会对默认情况下无法由 Kafka 解析器函数解析的数据进行编码。使用 Kafka 筛选器来分隔流中的消息,以改善数据使用情况。

有关详细信息,请参阅解析自定义格式

1 - 手动使用来自 Kafka 的数据

可以使用 COPY 语句将流式传输数据从 Kafka 手动加载到 Vertica 中,就像可以从一个文件或其他源加载一组有限数据一样。与标准数据源不同,Kafka 数据以消息流的形式连续到达,您必须先对这些消息进行解析才能将它们加载到 Vertica 中。在 COPY 语句中使用 Kafka 函数准备数据流。

此示例以增量式构建一个 COPY 语句,它从名为 web_hits 的 Kafka 主题手动加载 JSON 编码数据。web_hits 主题流式传输网站请求的服务器日志。

有关将数据加载到 Vertica 中的信息,请参阅数据加载

创建目标表

要确定目标表架构,必须确定消息结构。下面是 web_hits 流的一个示例:

{"url": "list.jsp", "ip": "144.177.38.106", "date": "2017/05/02 20:56:00",
"user-agent": "Mozilla/5.0 (compatible; MSIE 6.0; Windows NT 6.0; Trident/5.1)"}
{"url": "search/wp-content.html", "ip": "215.141.172.28", "date": "2017/05/02 20:56:01",
"user-agent": "Opera/9.53.(Windows NT 5.2; sl-SI) Presto/2.9.161 Version/10.00"}

此主题流式传输 JSON 编码数据。由于 JSON 数据不一致,并且可能包含不可预测的增加值,因此请将该数据流存储在一个 Flex 表中。Flex 表动态接受数据中出现的其他字段。

以下语句创建一个名为 web_table 的 Flex 表来存储该数据流:

=> CREATE FLEX TABLE web_table();

在 COPY 语句的开头,添加 web_table 作为目标表:

COPY web_table

有关 Flex 表的详细信息,请参阅 Flex 表

定义 KafkaSource

COPY 语句的源始终是 KafkaSource。KafkaSource 接受有关数据流、Kafka 代理和其他处理选项的详细信息以连续加载数据,直到满足结束条件

流详细信息

stream 参数定义您希望从一个或多个主题分区中加载的数据段。每个 Kafka 主题都将其消息拆分为不同的分区以获得可扩展的吞吐量。Kafka 根据 Kafka 管理员设置的规则为每个主题保留一个消息待完成项。可以选择加载待完成项中的部分或全部消息,或者仅加载当前流式传输的消息。

对于每个分区,stream 参数要求以竖线 (|) 分隔的列表形式提供主题名称、主题分区和分区偏移量。(可选)可以提供一个结束偏移量作为停止从数据流加载的结束条件:

'stream='topic_name|partition|start_offset[|end_offset]'

要从 web_hits 主题的单个分区加载整个待完成项,请使用 SOURCE 关键字附加具有以下 stream 参数值的 KafkaSource:

COPY ...
SOURCE KafkaSource(stream='web_hits|0|-2', ...

在前面的示例中:

  • web_hits 是要从中加载数据的主题的名称。

  • 0 是要从中加载数据的主题分区。主题分区从 0 开始建立索引,web_hits 仅包含一个分区。

  • -2 加载整个待完成项。这是一个特殊的偏移量值,它通知 KafkaSource 以最早可用的消息偏移量开始加载。

加载多个分区

此示例虽然仅从一个分区加载,但是请务必了解如何在一个 COPY 语句中从多个分区加载。

要从同一主题中的其他分区(甚至是其他主题)加载,请提供逗号分隔的列表,其中包括主题名称、分区号和由竖线分隔的偏移量值。例如,以下 stream 实参从 web_hits 主题的分区 0 到分区 2 加载整个待完成项:

KafkaSource(stream='web_hits|0|-2,web_hits|1|-2,web_hits|2|-2'...

当在同一 COPY 语句中加载多个分区时,可以将 executionparallelism 参数设置为定义针对 COPY 语句创建的线程数。理想情况下,希望每个分区使用一个线程。可以选择不指定值,让 Vertica 根据资源池中的分区数和可用资源确定线程数。在此例中,只有一个分区,因此不需要额外的线程来加载数据。

添加 Kafka 代理

KafkaSource 需要 Kafka 群集中代理的主机名(或 IP 地址)和端口号。Kafka 代理是 Vertica 为了检索 Kafka 数据而访问的服务。在以下示例中,Kafka 群集有一个名为 kafka01.example.com 的代理,该代理在端口 9092 上运行。请将 brokers 参数和值附加到 COPY 语句:

COPY ...
SOURCE KafkaSource(stream='web_hits|0|-2',
                   brokers='kafka01.example.com:9092', ...

选择结束条件

因为数据从 Kafka 连续到达,所以从 Kafka 手动加载需要定义一个结束条件来指示何时停止加载数据。除了流详细信息中描述的结束偏移量,还可以选择:

  • 在设置的一段时间内复制尽可能多的数据。

  • 一直加载数据,直到在超时时间内没有新数据到达。

  • 加载所有可用数据,而不等待任何进一步的数据到达。

以下示例运行 10000 毫秒(10 秒)的 COPY 以获取数据样本。如果 COPY 语句能够在 10 秒内加载整个数据待完成项,那么它将剩余的时间用于在流式传输数据到达时进行加载。此值在 duration 参数中设置。附加 duration 值以完成 KafkaSource 定义:

COPY ...
SOURCE KafkaSource(stream='web_hits|0|-2',
                    brokers='kafka01.example.com:9092',
                    duration=interval '10000 milliseconds')

如果从 Kafka 启动具有较长持续时间的 COPY 语句,并且需要停止它,可以调用一个函数(例如 CLOSE_ALL_SESSIONS)来关闭其会话。

选择解析器

Kafka 不强制其数据流上的消息格式。消息通常采用 Avro 或 JSON 格式,但也可以采用任何格式。COPY 语句通常使用三个 Kafka 特定解析器之一:

因为 Kafka 解析器可以识别流式传输数据中的记录边界,所以其他解析器(如 Flex 解析器)不与 KafkaSource 的输出直接兼容。必须使用筛选器更改 KafkaSource 输出,其他解析器才能处理数据。有关详细信息,请参阅解析自定义格式

在以下示例中,web_hits 中的数据采用 JSON 格式编码,因此它使用 KafkaJSONParser。此值在 COPY 语句的 PARSER 子句中进行设置:

COPY ...
SOURCE ...
PARSER KafkaJSONParser()

存储拒绝的数据

Vertica 将解析器无法解析的原始 Kafka 消息连同有关拒绝原因的信息保存到拒绝表中。该表由 COPY 语句创建。以下示例将拒绝内容保存到名为 web_hits_rejections 的表中。此值在 COPY 语句的 REJECTED DATA AS TABLE 子句中设置:

COPY ...
SOURCE ...
PARSER ...
REJECTED DATA AS TABLE public.web_hits_rejections;

将数据流加载到 Vertica 中

以下步骤使用前面几个部分中以增量方式构建的 COPY 语句加载 web_hits 主题中的 JSON 数据 10 秒:

  1. 执行 COPY 语句:

    => COPY web_table
       SOURCE KafkaSource(stream='web_hits|0|-2',
                          brokers='kafka01.example.com:9092',
                          duration=interval '10000 milliseconds')
       PARSER KafkaJSONParser()
       REJECTED DATA AS TABLE public.web_hits_rejections;
     Rows Loaded
    -------------
             800
    (1 row)
    
  2. 计算 Flex 表键:

    => SELECT compute_flextable_keys('web_table');
                  compute_flextable_keys
    --------------------------------------------------
     Please see public.web_table_keys for updated keys
    (1 row)
    

    有关更多详细信息,请参阅计算 Flex 表键

  3. 查询 web_table_keys 以返回这些键:

    => SELECT * FROM web_table_keys;
      key_name  | frequency | data_type_guess
    ------------+-----------+-----------------
     date       |       800 | Timestamp
     user_agent |       800 | Varchar(294)
     ip         |       800 | Varchar(30)
     url        |       800 | Varchar(88)
    (4 rows)
    
  4. 查询 web_table 以返回从 web_hits Kafka 主题加载的数据:

    => SELECT date, url, ip FROM web_table LIMIT 10;
            date         |                url                 |       ip
    ---------------------+------------------------------------+-----------------
     2021-10-15 02:33:31 | search/index.htm                   | 192.168.210.61
     2021-10-17 16:58:27 | wp-content/about.html              | 10.59.149.131
     2021-10-05 09:10:06 | wp-content/posts/category/faq.html | 172.19.122.146
     2021-10-01 08:05:39 | blog/wp-content/home.jsp           | 192.168.136.207
     2021-10-10 07:28:39 | main/main.jsp                      | 172.18.192.9
     2021-10-22 12:41:33 | tags/categories/about.html         | 10.120.75.17
     2021-10-17 09:41:09 | explore/posts/main/faq.jsp         | 10.128.39.196
     2021-10-13 06:45:36 | category/list/home.jsp             | 192.168.90.200
     2021-10-27 11:03:50 | category/posts/posts/index.php     | 10.124.166.226
     2021-10-26 01:35:12 | categories/search/category.htm     | 192.168.76.40
    (10 rows)
    

2 - 使用调度程序自动使用来自 Kafka 的数据

Vertica 提供了一个调度程序,可加载来自一个或多个 Kafka 主题的流消息。与手动使用 COPY 相比,自动加载流式传输数据有诸多优势:

  • 流数据会自动出现在数据库中。新数据出现在数据库中的频率由调度程序的时间范围持续时间控制。

  • 调度程序提供了一个只使用一次的过程。调度程序为您管理偏移量,以便 Kafka 发送的每条消息都会使用一次。

  • 可以将备份调度程序配置为提供高可用性。如果主调度程序由于某种原因失败,备用调度程序将自动接管数据加载。

  • 调度程序管理用于加载数据的资源。可通过分配给调度程序的资源池设置来控制调度程序对资源的使用情况。在手动加载时,必须考虑在加载时所使用的资源。

使用调度程序也有一些缺点,可能不适合您的需求。您可能会发现调度程序无法提供加载过程所需的灵活性。例如,调度程序无法在加载事务期间执行业务逻辑。如果需要执行这种处理,最好创建自己的加载过程。此过程会定期运行 COPY 语句来加载 Kafka 中的数据。然后,它会在提交事务之前执行所需的业务逻辑处理。

有关作业调度程序要求的信息,请参考 Apache Kafka 集成

作业调度程序的功能

调度程序负责安排从 Kafka 加载数据。调度程序的基本处理单元是一个时间范围,也就是一段时间。在每一时间范围内,调度程序为要运行的每个活动微批处理分配一段时间。每个微批处理负责从单个来源加载数据。一旦时间范围结束,调度程序将开始下一时间范围。调度程序继续这个过程,直到您停止它为止。

调度程序剖析

每个调度程序都有多组设置,每组设置控制数据加载的一个方面。这些组包括:

  • 调度程序本身,用来定义配置架构、时间范围持续时间和资源池。

  • 群集,用来定义 Kafka 群集中与调度程序联系来加载数据的数据。每个调度程序可以包含多个群集,从而允许使用单个调度程序来从多个 Kafka 群集加载数据。

  • 源,用来定义 Kafka 主题及这些主题中从中读取数据的分区。

  • 目标,用来定义 Vertica 中将接收数据的表。这些表可以是传统的 Vertica 数据库表,也可以是 Flex 表。

  • 加载规范,用来定义 Vertica 在加载数据时使用的设置。这些设置包括 Vertica 加载数据时需要使用的解析器和筛选器。例如,如果要读取采用 Avro 格式的 Kafka 主题,则加载规范需要指定 Avro 解析器和架构。

  • 微批处理,用来 表示来自 Kafka 流的数据负载的各个分段。它们结合了您使用其他 vkconfig 工具创建的群集、源、目标和负载规范的定义。该调度程序使用微批处理中的所有信息执行 COPY 语句,并使用 KafkaSource UDL 函数将数据从 Kafka 传输到 Vertica。每个微批处理负载的统计信息均存储在 stream_microbatch_history 表中。

vkconfig 脚本

可以使用名为 vkconfig 的 Linux 命令行脚本来创建、配置和运行调度程序。此脚本安装在 Vertica 主机及 Vertica 服务器上的以下路径:

/opt/vertica/packages/kafka/bin/vkconfig

vkconfig 脚本包含多个工具。Vkconfig 脚本的第一个实参始终为您希望使用的工具。每个工具执行一个功能,例如更改一组设置(如群集或源)或启动和停止调度程序。例如,要创建或配置调度程序,可以使用以下命令:

$ /opt/vertica/packages/kafka/bin/vkconfig scheduler other options...

创建调度程序时会发生什么

在创建新调度程序时,vkconfig 脚本执行以下步骤:

  • 使用为调度程序指定的名称创建一个新的 Vertica 架构。可以在配置期间使用此名称标识调度程序。

  • 在新创建的架构中创建为了管理 Kafka 数据加载而所需的表。有关详细信息,请参阅数据流式传输架构表

验证调度程序

在创建或配置调度程序时,调度程序会验证以下设置:

  • 确认指定群集中的所有代理都存在。

  • 连接到指定的主机,并检索 Kafka 群集中所有代理的列表。获取此列表始终确保调度程序拥有所有代理的最新列表。如果主机是已经定义的群集的一部分,则调度程序将取消配置。

  • 确认指定的源是否存在。如果该源不再存在,则禁用该源。

  • 检索源中的分区数目。如果从源检索到的分区数与调度程序保存的分区值不同,Vertica 将使用从群集中的源检索到的分区数更新该调度程序。

可以使用 vkconfig 脚本的调度程序工具中的 --validation-type 选项禁用验证。有关详细信息,请参阅调度程序工具选项

同步调度程序

默认情况下,调度程序自动与 Kafka 主机群集同步其配置和源信息。可以使用 --config-refresh 调度程序实用程序选项配置同步间隔。在每个间隔,调度程序都会:

  • 通过在其 Vertica 配置架构中查询其设置,检查调度程序的配置有无更新。

  • 执行验证调度程序中列出的所有检查。

可以使用 vkconfig 脚本的调度程序工具中的 --auto-sync 选项配置同步设置。 调度程序工具选项 了解详细信息。

启动调度程序

可以使用 vkconfig 脚本的启动工具启动调度程序。

启动调度程序时,它会从源收集数据,从指定的偏移量开始。您可以查看 stream_microbatch_history 表,了解调度程序在任何给定时间执行的操作。

要了解如何创建、配置和启动调度程序,请参阅本指南中的设置调度程序

您也可以选择屏蔽调度程序。例如,您可能希望使用指定的偏移量范围执行一次加载。有关详细信息,请参阅本指南中的手动使用来自 Kafka 的数据

如果 Vertica 群集下线,则调度程序将尝试重新连接,但失败。当群集重新启动时,必须重新启动调度程序。

管理正在运行的调度程序

从命令行启动调度程序时,它在前台运行。它会一直运行,直到您终止(或主机关闭)。通常,您需要将调度程序作为守护程序进程来启动,该进程会在主机操作系统启动时或在 Vertica 数据库启动后启动调度程序。

使用 vkconfig 脚本的关闭工具关闭正在运行的调度程序。有关详细信息,请参阅关闭工具选项

可以在调度程序运行时更改它的大部分设置(例如,添加或更改群集、源、目标和微批处理)。调度程序会自动处理配置更新。

启动多个作业调度程序以实现高可用性

要实现高可用性,您可以面向同一个配置架构启动两个或多个相同的调度程序。可以使用启动工具的 --instance-name 选项来区分调度程序(请参阅启动工具选项)。

活动调度程序加载数据并维护 stream_lock 表上的 S 锁。未使用的调度程序将一直处于备用模式,直到活动调度程序失败或被禁用。如果活动调度程序失败,备用调度程序将立即获取 stream_lock 表上的锁,并从失败的调度程序离开的位置接管从 Kafka 的数据加载。

管理自动加载期间被拒绝的消息

Vertica 在自动加载期间使用解析器定义拒绝消息,这在微批处理加载规范中是必需操作。

调度程序将创建一个拒绝表来自动存储每个微批处理的被拒绝消息。要手动指定拒绝表,请在创建微批处理时使用 --rejection-schema--rejection-table微批处理实用程序选项。查询 stream_microbatches 表可返回微批处理的拒绝架构和拒绝表。

有关 Vertica 如何处理被拒绝数据的其他详细信息,请参阅处理杂乱的数据

将选项传递给调度程序的 JVM

调度程序使用 Java 虚拟机,通过 JDBC 连接到 Vertica。可以通过名为 VKCONFIG_JVM_OPTS 的 Linux 环境变量将命令行选项传递给 JVM。将调度程序配置为在连接 Vertica 时使用 TLS/SSL 加密时,此选项非常有用。有关详细信息,请参阅为调度程序配置 TLS 连接

从 MC 中查看调度程序

可以从 MC 中查看 Kafka 作业的状态。有关详细信息,请参考 查看加载历史记录

2.1 - 设置调度程序

您可以使用 Linux 命令行设置调度程序。通常,可以在要运行调度程序的主机上执行配置。该主机可以是您的 Vertica 主机之一,也可以是已安装 vkconfig 实用程序的其他主机(有关详细信息,请参阅 vkconfig 脚本)。

请按照以下步骤设置并启动调度程序以将数据从 Kafka 流式传输到 Vertica:

  1. 创建配置文件(可选)

  2. 将 Kafka Bin 目录添加到路径中(可选)

  3. 为调度程序创建资源池

  4. 创建调度程序

  5. 创建群集

  6. 创建数据表

  7. 创建源

  8. 创建目标

  9. 创建加载规范

  10. 创建微批处理

  11. 启动调度程序

上述步骤将在以下部分中进行说明。以下各部分将使用将 Web 日志数据(网站点击量)从 Kafka 加载到 Vertica 表的示例。

创建配置文件(可选)

您在创建调度程序时提供给 vkconfig 脚本的许多实参都不会改变。例如,您经常需要将用户名和密码传递给 Vertica 以授权对数据库进行更改。在每次调用 vkconfig 时添加用户名和密码既繁琐且易出错。

您可以改为使用为您指定这些实参的 --conf 选项向 vkconfig 实用程序传递配置文件。这样可避免大量的打字工作,降低挫败感。

配置文件是一个文本文件,其中每行均包含“关键字=”对。每个关键字都是一个 vkconfig 命令行选项,例如常用 vkconfig 脚本选项中列出的选项。

以下示例显示了名为 weblog.conf 的配置文件,该文件将用于定义名为 weblog_sched 的调度程序。此示例的剩余部分都会使用此配置文件。


# The configuraton options for the weblog_sched scheduler.
username=dbadmin
password=mypassword
dbhost=vertica01.example.com
dbport=5433
config-schema=weblog_sched

将 vkconfig 目录添加到路径中(可选)

vkconfig 脚本位于 /opt/vertica/packages/kafka/bin 目录中。在每次调用 vkconfig 时键入此路径比较繁琐。您可以使用以下命令将 vkconfig 添加到当前 Linux 会话的搜索路径中:

$ export PATH=/opt/vertica/packages/kafka/bin:$PATH

对于会话的剩余部分,您可以调用 vkconfig 而无需指定其整个路径:

$ vkconfig
Invalid tool
Valid options are scheduler, cluster, source, target, load-spec, microbatch, sync, launch,
shutdown, help

如果要使此设置永久有效,请将导出语句添加到 ~/.profile 文件中。此示例的剩余部分假设您已将此目录添加到 shell 的搜索路径中。

为调度程序创建资源池

Vertica 建议您始终专门为每个调度程序创建一个 资源池。调度程序假定自身独占使用分配的资源池。为调度程序使用单独的池,有助于优化它对 Vertica 群集性能的影响。您可以使用 CREATE RESOURCE POOL 语句在 Vertica 中创建资源池。

以下资源池设置在创建调度程序的资源池时发挥着重要作用:

  • PLANNEDCONCURRENCY 决定调度程序同时发送给 Vertica 的微批处理(COPY 语句)的数量。

  • EXECUTIONPARALLELISM 决定每个节点为处理微批处理分区而创建的最大线程数。

  • QUEUETIMEOUT 提供对资源计时的手动控制。将此参数设置为 0 即可允许调度程序管理计时。

有关这些设置以及如何为调度程序微调资源池的详细信息,请参阅管理调度程序资源和性能

以下 CREATE RESOURCE POOL 语句将创建可加载 1 个微批处理并处理 1 个分区的资源池:

=> CREATE RESOURCE POOL weblog_pool
    MEMORYSIZE '10%'
    PLANNEDCONCURRENCY 1
    EXECUTIONPARALLELISM 1
    QUEUETIMEOUT 0;

如果没有为调度程序创建和分配资源池,它将使用 GENERAL 资源池的一部分。Vertica 建议您不要将 GENERAL 池用于生产环境中使用的调度程序。这种回退到使用 GENERAL 池的举措,是为了在测试调度程序配置期间提供方便。当您准备好部署调度程序时,请创建一个您已根据特定需求进行调整的资源池。每次启动使用 GENERAL 池的调度程序时,vkconfig 实用程序都会显示一条警告消息。

如果没有为调度程序分配足够的资源,则有可能会导致错误。例如,如果调度程序无法从预期的数据帧中加载所有主题的数据,则可能会显示错误:OVERSHOT DEADLINE FOR FRAME。

有关资源池的详细信息,请参阅资源池架构

创建调度程序

Vertica 包含名为 stream_config 的默认调度程序。您可以使用此调度程序,或者使用 vkconfig 脚本的调度程序工具以及 --create--config-schema 选项来创建新的调度程序:

$ vkconfig scheduler --create --config-schema scheduler_name --conf conf_file

添加使用默认选项的调度程序,只需使用 --create--config-schema 选项即可。此命令会在 Vertica 中创建一个包含调度程序配置的新架构。有关创建调度程序架构的详细信息,请参阅创建调度程序时发生的情况

您也可使用其他配置参数进一步自定义您的调度程序。有关详细信息,请参阅调度程序工具选项

下面的例子:

  • 使用 --config-schema 选项创建名为 weblog_sched 的调度程序。

  • 使用 --operator 选项向名为 kafka_user 的 Vertica 用户授予配置和运行调度程序的权限。dbadmin 用户必须单独指定其他权限。

  • 使用 --frame-duration 选项指定七分钟的时间范围持续时间。有关选择时间范围持续时间的详细信息,请参阅选择时间范围持续时间

  • 将调度程序使用的资源池设置为之前创建的 weblog_pool:

$ vkconfig scheduler --create --config-schema weblog_sched --operator kafka_user \
  --frame-duration '00:07:00' --resource-pool weblog_pool --conf weblog.conf

创建群集

您必须将至少一个 Kafka 群集与调度程序相关联。调度程序可以访问多个 Kafka 群集。要创建群集,您可以提供群集名称以及 Kafka 群集代理的主机名和端口。

创建群集时,调度程序会尝试通过连接到 Kafka 群集来对其进行验证。如果连接成功,调度程序会自动检索群集中所有代理的列表。因此,您不必在 --hosts 参数中列出每个代理。

以下示例会创建一个名为 kafka_weblog 的群集,其中包含两个 Kafka 代理主机:example.com 域中的 kafka01 和 kafka03。Kafka 代理正在端口 9092 上运行。

$ vkconfig cluster --create --cluster kafka_weblog \
  --hosts kafka01.example.com:9092,kafka03.example.com:9092 --conf weblog.conf

有关详细信息,请参阅群集工具选项

创建源

接下来,至少创建一个供调度程序读取的源。源定义了调度程序将从中加载数据的 Kafka 主题以及该主题包含的分区数。

要创建源并将其与已配置的调度程序相关联,请使用 source 工具。创建源时,Vertica 会连接到 Kafka 群集以验证主题是否存在。因此,在创建源之前,请确保该主题已存在于 Kafka 群集中。由于 Vertica 会验证主题是否存在,因此您必须使用 --cluster 选项提供先前定义的群集名称。

以下示例会在上一步中创建的群集上为名为 web_hits 的 Kafka 主题创建源。此主题具有一个分区。

$ vkconfig source --create --cluster kafka_weblog --source web_hits --partitions 1 --conf weblog.conf

有关详细信息,请参阅源工具选项

创建数据表

在可以为调度程序创建目标之前,您必须在 Vertica 数据库中创建目标表。这是 Vertica 用于存储调度程序从 Kafka 加载的数据的表。您必须决定为目标创建的表类型:

  • 使用 CREATE TABLE 语句创建的标准 Vertica 数据库表。此类型的表可高效地存储数据。但是,您必须确保其列与您正在加载的 Kafka 主题中消息的数据格式相匹配。不能将复杂类型的数据加载到标准 Vertica 表中。

  • 使用 CREATE FLEXIBLE TABLE 创建的 Flex 表。Flex 表的效率低于标准 Vertica 数据库表。但是,它足够灵活,可以处理架构不断变化的数据。此外,还可以加载标准 Vertica 表无法加载的大多数复杂数据类型(例如地图和列表)。

此示例中的数据采用的是设定好的格式,因此最好使用标准 Vertica 表。以下示例会创建名为 web_hits 的表来保存四列数据。此表位于公共架构中。

=> CREATE TABLE web_hits (ip VARCHAR(16), url VARCHAR(256), date DATETIME, user_agent VARCHAR(1024));

创建目标

创建目标表后,您可以创建调度程序的目标。目标会告知调度程序在何处存储从 Kafka 检索到的数据。创建目标时,此表必须存在。您可以使用带有 --target-schema--target_table 选项的 vkconfig 脚本的目标工具来指定 Vertica 目标表的架构和名称。以下示例会为在上一步中创建的表添加目标。

$ vkconfig target --create --target-schema public --target-table web_hits --conf weblog.conf

有关详细信息,请参阅目标工具选项

创建加载规范

调度程序的加载规范提供 Vertica 在解析从 Kafka 加载的数据时使用的参数。最重要的选项是 --parser,可设置 Vertica 用于解析数据的解析器。您可以使用三个解析器选项:

在此示例中,正在从 Kafka 加载的数据采用 JSON 格式。以下命令会创建名为 weblog_load 的加载规范并将解析器设置为 KafkaJSONParser。

$ vkconfig load-spec --create --parser KafkaJSONParser --load-spec weblog_load --conf weblog.conf

有关详细信息,请参阅加载规范工具选项

创建微批处理

微批处理将合并到目前为止添加到调度程序的所有设置,以定义调度程序用于从 Kafka 加载数据的各个 COPY 语句。

以下示例使用前面示例中创建的所有设置来创建名为 weblog 的微批处理。

$ vkconfig microbatch --create --microbatch weblog --target-schema public --target-table web_hits \
           --add-source web_hits --add-source-cluster kafka_weblog --load-spec weblog_load \
           --conf weblog.conf

对于可能受益于事务大小减小的微批处理,请考虑在创建微批处理时使用 --max-parallelism 选项。此选项会将具有多个分区的单个微批处理拆分为由较少分区组成的指定数量的同步 COPY 语句。

有关 --max-parallelism 和其他选项的详细信息,请参阅微批处理工具选项

启动调度程序

创建至少一个微批处理后,就可以运行调度程序。您可以使用启动工具来启动调度程序,并将调度程序架构的名称传递给它。调度程序会开始为在其架构中定义的每个已启用微批处理调度微批处理加载。

以下示例会启动在前面步骤中定义的 weblog 调度程序。该调度程序会使用 nohup 命令来防止调度程序在用户注销时被终止,并重定向 stdout 和 stderr 来防止创建 nohup.out 文件。

$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &

有关详细信息,请参阅启动工具选项

检查调度程序是否正在运行

启动调度程序后,您可以通过查询调度程序架构中的 stream_microbatch_history 表来验证它是否正在运行。此表列出了调度程序已运行的每个微批处理的结果。

例如,此查询列出了微批处理名称、微批处理的开始时间和结束时间、批处理的开始偏移量和结束偏移量,以及批处理结束的原因。结果从调度程序已启动时开始排序:

=> SELECT microbatch, batch_start, batch_end, start_offset,
          end_offset, end_reason
          FROM weblog_sched.stream_microbatch_history
          ORDER BY batch_start DESC LIMIT 10;

 microbatch |        batch_start         |         batch_end          | start_offset | end_offset |  end_reason
------------+----------------------------+----------------------------+--------------+------------+---------------
 weblog     | 2017-10-04 09:30:19.100752 | 2017-10-04 09:30:20.455739 |           -2 |      34931 | END_OF_STREAM
 weblog     | 2017-10-04 09:30:49.161756 | 2017-10-04 09:30:49.873389 |        34931 |      34955 | END_OF_STREAM
 weblog     | 2017-10-04 09:31:19.25731  | 2017-10-04 09:31:22.203173 |        34955 |      35274 | END_OF_STREAM
 weblog     | 2017-10-04 09:31:49.299119 | 2017-10-04 09:31:50.669889 |        35274 |      35555 | END_OF_STREAM
 weblog     | 2017-10-04 09:32:19.43153  | 2017-10-04 09:32:20.7519   |        35555 |      35852 | END_OF_STREAM
 weblog     | 2017-10-04 09:32:49.397684 | 2017-10-04 09:32:50.091675 |        35852 |      36142 | END_OF_STREAM
 weblog     | 2017-10-04 09:33:19.449274 | 2017-10-04 09:33:20.724478 |        36142 |      36444 | END_OF_STREAM
 weblog     | 2017-10-04 09:33:49.481563 | 2017-10-04 09:33:50.068068 |        36444 |      36734 | END_OF_STREAM
 weblog     | 2017-10-04 09:34:19.661624 | 2017-10-04 09:34:20.639078 |        36734 |      37036 | END_OF_STREAM
 weblog     | 2017-10-04 09:34:49.612355 | 2017-10-04 09:34:50.121824 |        37036 |      37327 | END_OF_STREAM
(10 rows)

2.2 - 选择时间范围持续时间

调度程序的一个关键设置是它的时间范围持续时间。持续时间设置调度程序运行您为其定义的所有微批处理的时间量。此设置对如何从 Apache Kafka 加载数据有显著的影响。

在每一时间范围期间会发生什么

要了解正确的时间范围持续时间,首先需要了解在每一时间范围期间发生了什么。

时间范围持续时间在添加到调度程序的微批处理之间分配。此外,每一时间范围都有一些开销,需要花时间处理微批处理。在每个微批处理中,也有一些开销,这减少了微批处理在从 Kafka 加载数据方面花费时间。下图大致显示了每一时间范围是如何划分的:

正如您所见,时间范围中只有一部分时间用于实际加载流式传输数据。

调度程序如何对微批处理排列优先顺序

首先,调度程序将时间范围中的时间均匀地分给各个微批处理。然后,它依次运行每个微批处理。

在每个微批处理中,专门花大量时间来通过 COPY 语句加载数据。此语句使用 KafkaSource UDL 加载数据。它会一直运行到发生以下两种情况之一:

  • 它到达为微批处理定义的主题和分区的数据流的末端。在这种情况下,微批处理提前完成处理。

  • 它到达调度程序设置的超时时间。

当调度程序处理时间范围时,它会记下哪些微批处理提前完成。然后,它会安排它们在下一时间范围首先运行。如果以这种方式安排微批处理,则可以让调度程序将时间范围中的更多时间分配给在加载数据方面花费最多时间的微批处理(或许还没有足够的时间到达其数据流的末端)。

例如,请考虑下图。在第一时间范围期间,调度程序在微批处理之间平均分配时间。微批处理 #2 使用分配给它的所有时间(如填充区域所示),而其他微批处理并未这样。在下一时间范围中,调度程序重新排列这些微批处理,使提前完成的微批处理首先进行。它还将更少的时间分配给运行时间较短的微批处理。假定这些微批处理再次提前完成,调度程序便能够将时间范围中的剩余时间分配给微批处理 #2。当调度程序运行时,这种优先级的转移将会继续进行。如果一个主题的流量激增,调度程序会为读取该主题的微批处理分配更多的时间作为补偿。

时间范围持续时间太短会发生什么

如果将调度程序的时间范围持续时间设置得太短,微批处理可能没有足够的时间来加载它们负责读取的数据流中的所有数据。在最坏的情况下,当在每一时间范围期间读取大容量主题时,微批处理可能会落后更多。如果不加以解决,这个问题可能会导致消息永远无法加载,因为在微批处理有机会读取它们之前,它们就已经在数据流中过期了。

在极端情况下,调度程序可能无法在每一时间范围期间中运行每一个微批处理。如果时间范围持续时间太短,以至于大部分时间都花在开销任务(如提交数据和准备运行微批处理)上,便会发生这个问题。每个微批处理为了从 Kafka 加载数据而运行的 COPY 语句的最短持续时间为 1 秒。再加上处理数据加载的开销。一般来说,如果时间范围持续时间短于 2 秒乘以调度程序中的微批处理数,那么一些微批处理可能会没有机会在每一时间范围中运行。

如果调度程序在一个时间范围期间运行每个微批处理超时,那么它在下一时间范围期间会通过优先安排前一时间范围中没有运行的微批处理来做出补偿。这个策略确保每个微批处理都有机会加载数据。但是,这并不能解决问题的根本原因。最好的解决方案是增加时间范围持续时间,为每个微批处理分配足够的时间来在每一时间范围期间加载数据。

时间范围持续时间太长会发生什么

长时间范围持续时间的一个缺点是增加了数据延迟。这个延迟是从 Kafka 发出数据到可在数据库中查询数据之间的时间。较长的时间范围持续时间意味着微批处理的每次执行之间有更多的时间。这意味着更新数据库中的数据之间的间隔时间更长。

这种延迟可能并不重要,具体取决于应用。在确定时间范围持续时间时,请考虑数据可能会延迟到整个时间范围持续时间是否将导致问题。

使用长时间范围持续时间时要考虑的另一个问题是,关闭调度程序所需的时间。只有在当前 COPY 语句完成之后,调度程序才会关闭。根据时间范围持续时间的长度,这个过程可能需要几分钟。

最短时间范围持续时间

为添加到调度程序的每个微批处理至少分配两秒钟。如果时间范围持续时间短于该下限,vkconfig 实用程序将会发出警告。在大多数情况下,您都希望时间范围持续时间更长。如果每个微批处理两秒钟,几乎没有时间来加载数据。

平衡时间范围持续时间要求

要确定部署的最佳时间范围持续时间,请考虑您对数据延迟的敏感程度。如果不是在对来自 Kafka 的数据流式传输执行时间敏感型查询,可以使用默认的 5 分钟甚至更长的时间范围持续时间。如果需要数据延迟更短,那么请考虑从 Kafka 读取的数据量。如果时间范围持续时间较短,那么大容量数据或者流量有明显峰值的数据可能会导致问题。

针对不同需求使用不同的调度程序

假定您要加载的流式传输数据来自少数您希望以低延迟查询的 Kafka 主题,以及其他具有大量数据但您可以承受更长时间延迟的主题。在这种情况下,选择“中间”时间范围持续时间可能无法满足任一需求。更好的解决方案是使用多个调度程序:创建两个调度程序,其中一个调度程序的时间范围持续时间较短,仅读取需要以低延迟查询的主题;另一个调度程序的时间范围持续时间较长,用于从大容量主题加载数据。

例如,假定您要通过 Kafka 将流式传输数据从物联网 (IOT) 传感器网络加载到 Vertica。您可以使用其中的大部分数据定期生成报告并更新仪表板显示。这两个用例对时间都不是特别敏感。但是,正在从中进行加载的三个主题确实包含时间敏感型数据(系统故障、入侵检测和连接中断),这些数据必须立即触发警报。

在这种情况下,可以创建两个调度程序,其中一个调度程序的时间范围持续时间为 5 分钟或更长,用于读取包含非关键数据的大多数主题;第二个调度程序的时间范围持续时间至少为 6 秒(但最好更长),用于仅加载来自三个时间敏感型主题的数据。希望这些主题中的数据量足够小,以至于短时间范围持续时间不会导致问题。

2.3 - 管理调度程序资源和性能

调度程序的性能受调度程序中的微批处理数、每个微批处理中的分区和 Vertica 群集中的节点影响。使用资源池为调度程序分配一部分系统资源,并微调这些资源,以优化 Vertica 的自动加载。

以下各节详细介绍了调度程序资源池配置和处理场景:

调度程序和资源池

Vertica 建议您始终专门为每个调度程序创建一个 资源池。调度程序假定自身独占使用分配的资源池。为调度程序使用单独的池,有助于优化它对 Vertica 群集性能的影响。您可以使用 CREATE RESOURCE POOL 语句在 Vertica 中创建资源池。

如果没有为调度程序创建和分配资源池,它将使用 GENERAL 资源池的一部分。Vertica 建议您不要将 GENERAL 池用于生产环境中使用的调度程序。这种回退到使用 GENERAL 池的举措,是为了在测试调度程序配置期间提供方便。当您准备好部署调度程序时,请创建一个您已根据特定需求进行调整的资源池。每次启动使用 GENERAL 池的调度程序时,vkconfig 实用程序都会显示一条警告消息。

如果没有为调度程序分配足够的资源,则有可能会导致错误。例如,如果调度程序无法从预期的数据帧中加载所有主题的数据,则可能会显示错误:OVERSHOT DEADLINE FOR FRAME。

有关资源池的详细信息,请参阅资源池架构

关键资源池设置

微批处理是一个工作单元,它在一个时间范围持续时间内处理单个 Kafka 主题的分区。以下资源池设置在 Vertica 如何加载微批处理和处理分区中发挥着重要作用:

  • PLANNEDCONCURRENCY 决定调度程序同时发送给 Vertica 的微批处理(COPY 语句)的数量。在每一时间范围开始时,调度程序都会创建 PLANNEDCONCURRENCY 指定的调度程序线程数。每个调度程序线程连接到 Vertica,每次加载一个微批处理。如果微批处理数大于调度程序线程数,调度程序将额外的微批处理放入队列,并在线程变得可用时加载它们。
  • EXECUTIONPARALLELISM 决定每个节点用于处理微批处理分区的最大线程数。当一个微批处理加载到 Vertica 时,它的分区会均匀地分布在群集中的节点之间。在每一时间范围期间,节点最多为每个分区创建一个线程。每个线程一次从一个分区读取数据,直到处理完成或该时间范围结束。如果分区数大于所有节点上的线程数,则在线程可用时处理剩余的分区。
  • QUEUETIMEOUT 提供对资源计时的手动控制。将资源池参数 QUEUETIMEOUT 设置为 0 即可允许调度程序管理计时。在所有的微批处理都得到处理后,调度程序将等待该时间范围的剩余时间来处理下一个微批处理。具有适当大小的配置包括为了应对流量高峰而规划的休息时间。有关时间范围持续时间大小所带来影响的信息,请参阅选择时间范围持续时间

例如,以下 CREATE RESOURCE POOL 语句将创建一个名为 weblogs_pool 的资源池,它同时加载 2 个微批处理。Vertica 群集中的每个节点为每个微批处理创建 10 个线程来处理分区:

=> CREATE RESOURCE POOL weblogs_pool
    MEMORYSIZE '10%'
    PLANNEDCONCURRENCY 2
    EXECUTIONPARALLELISM 10
    QUEUETIMEOUT 0;

对于三节点 Vertica 群集,weblogs_pool 为每个节点提供的资源将最多创建 10 个线程来处理分区,或者每个微批处理最多总共 30 个线程。

并发加载多个微批处理

在某些情况下,调度程序中的微批处理可能比可用的 PLANNEDCONCURRENCY 多。下面的几张图说明了当没有足够的调度程序线程同时加载每个微批处理时,调度程序如何将微批处理加载到单个 Vertica 节点中。虽然资源池的 PLANNEDCONCURRENCY (PC) 设置为 2,但是调度程序必须加载三个微批处理:A、B 和 C。为简单起见,将 EXECUTIONPARALLELISM (EP) 设置为 1。

首先,调度程序加载微批处理 A 和微批处理 B,而微批处理 C 等待:

加载第一组微批处理。

当任何一个微批处理完成加载后,调度程序加载任何剩余的微批处理。在下图中,微批处理 A 已完全加载到 Vertica 中。调度程序继续加载微批处理 B,并使用新的可用调度程序线程加载微批处理 C:

加载剩余的微批处理。

调度程序继续发送数据,直到将所有微批处理加载到 Vertica 中,或该时间范围结束。

试用 PLANNEDCONCURRENCY 来优化性能。请注意,设置得过高可能会在每一时间范围的开始创建太多的连接,从而导致 Vertica 或 Kafka 的可扩展性压力。将 PLANNEDCONCURRENCY 设置得过低则不能充分利用 Vertica 的多处理能力。

Vertica 中的并行处理

资源池设置 EXECUTIONPARALLELISM 限制每个 Vertica 节点为处理分区创建的线程数。下图演示了在 EXECUTIONPARALLELISM 不足以为每个分区创建一个线程时,三节点 Vertica 群集如何处理具有九个分区的主题。这些分区均匀地分布在 Vertica 群集中的节点 1、节点 2 和节点 3 之间。调度程序的资源池将 PLANNEDCONCURRENCY (PC) 设置为 1,将 EXECUTIONPARALLELISM (EP) 设置为 2,因此当调度程序加载微批处理 A 时,每个节点最多创建 2 个线程。每个线程一次从一个分区进行读取。没有为其分配线程的分区必须等待处理:

使用可用线程处理分区。

当线程处理完为其分配的分区后,剩下的分区将在线程可用时分配给线程:

当线程可用时处理剩余分区。

在调度程序的资源池上设置 EXECUTIONPARALLELISM 时,请考虑调度程序中所有微批处理的分区数。

并发加载分区主题

具有多个分区的单个主题可能会从增加并行加载或降低事务大小中受益。使用 --max-parallelism微批处理,可以动态地将具有多个分区的主题拆分为多个负载均衡的微批处理,每个微批处理均由原始微批处理的分区的子集组成。调度程序使用其资源池中可用的 PLANNEDCONCURRENCY 来同时加载动态拆分的微批处理。

调度程序资源池中的 EXECUTIONPARALLELISM 设置决定每个节点为处理其单个微批处理的部分分区而创建的最大线程数。拆分微批处理使每个节点都能够为同一工作单元创建更多的线程。当有足够的 PLANNEDCONCURRENCY 并且每个节点分配的分区数大于调度程序资源池中的 EXECUTIONPARALLELISM 设置时,使用 --max-parallelism 拆分微批处理并在每个节点上创建更多线程来并行处理更多分区。

下图演示了双节点 Vertica 群集如何使用 PLANNEDCONCURRENCY (PC) 和 EXECUTIONPARALLELISM (EP) 均设置为 2 的资源池来加载和处理微批处理 A。因为调度程序只加载一个微批处理,所以有 1 个调度程序线程未使用。每个节点为每个调度程序线程创建 2 个线程来处理分配给它的分区:

在不使用 max-parallelism 选项时加载。

将微批处理 A 的 --max-parallelism 选项设为 2 将使调度程序可以动态地将微批处理 A 拆分为 2 个更小的微批处理 A1 和 A2。因为有 2 个调度程序线程可用,所以子集微批处理会同时加载到 Vertica 中。每个节点为每个调度程序线程创建 2 个线程来处理微批处理 A1 和 A2 的分区:

使用 max-parallelism 选项加载。

使用 --max-parallelism 防止由大容量 Kafka 主题组成的微批处理中出现瓶颈。它还为需要额外处理(如文本索引)的微批处理提供更快的加载。

2.4 - 对 Kafka 调度程序使用连接负载均衡

--dbhost 选项或配置文件中的 dbhost 条目中向调度程序提供 Vertica 节点的名称。调度程序连接到此节点以初始化它用于从 Kafka 加载数据而执行的所有语句。例如,每次执行微批处理时,调度程序都会连接到同一节点来运行 COPY 语句。如果使用单个节点作为所有调度程序操作的启动程序节点,这会影响该节点的性能,进而影响整个数据库的性能。

为了避免单个节点成为瓶颈,可以使用连接负载均衡将运行调度程序语句的负载分散到数据库中的多个节点上。连接负载均衡在负载均衡组内的节点之间分配客户端连接。有关此功能的概述,请参阅关于本机连接负载均衡

为调度程序启用连接负载均衡是一个两步骤过程:

  1. 为调度程序选择或创建负载均衡策略。

  2. 在调度程序中启用负载均衡。

为调度程序选择或创建负载均衡策略

连接负载均衡策略将来自特定网络地址集的传入连接重定向到一组节点。如果您的数据库已定义合适的负载均衡策略,那么可以直接使用它,而不是专门为调度程序创建一个。

如果数据库没有合适的策略,请创建一个。让您的策略将来自运行 Kafka 调度程序的主机的 IP 地址的连接重定向到数据库中的一组节点。您选择的节点组将充当调度程序所执行语句的启动程序。

以下示例演示了如何为三节点数据库中的所有三个节点设置负载均衡策略。调度程序在数据库中的节点 1 上运行,因此路由规则的源地址范围 (192.168.110.0/24) 包含数据库中节点的 IP 地址。示例的最后一步验证是否已对来自第一个节点(IP 地址 10.20.110.21)的连接进行负载均衡。

=> SELECT node_name,node_address,node_address_family FROM v_catalog.nodes;
    node_name     | node_address | node_address_family
------------------+--------------+----------------------
 v_vmart_node0001 | 10.20.110.21 | ipv4
 v_vmart_node0002 | 10.20.110.22 | ipv4
 v_vmart_node0003 | 10.20.110.23 | ipv4
(4 rows)


=> CREATE NETWORK ADDRESS node01 ON v_vmart_node0001 WITH '10.20.110.21';
CREATE NETWORK ADDRESS
=> CREATE NETWORK ADDRESS node02 ON v_vmart_node0002 WITH '10.20.110.22';
CREATE NETWORK ADDRESS
=> CREATE NETWORK ADDRESS node03 on v_vmart_node0003 WITH '10.20.110.23';
CREATE NETWORK ADDRESS

=> CREATE LOAD BALANCE GROUP kafka_scheduler_group WITH ADDRESS node01,node02,node03;
CREATE LOAD BALANCE GROUP
=> CREATE ROUTING RULE kafka_scheduler_rule ROUTE
   '10.20.110.0/24' TO kafka_scheduler_group;
CREATE ROUTING RULE
=> SELECT describe_load_balance_decision('10.20.110.21');
                     describe_load_balance_decision
--------------------------------------------------------------------------------
  Describing load balance decision for address [10.20.110.21]
Load balance cache internal version id (node-local): [2]
Considered rule [kafka_scheduler_rule] source ip filter [10.20.110.0/24]...
input address matches this rule
Matched to load balance group [kafka_scheduler_group] the group has policy [ROUNDROBIN]
number of addresses [3]
(0) LB Address: [10.20.110.21]:5433
(1) LB Address: [10.20.110.22]:5433
(2) LB Address: [10.20.110.23]:5433
Chose address at position [1]
Routing table decision: Success. Load balance redirect to: [10.20.110.23] port [5433]

(1 row)

在调度程序中启用负载均衡

客户端必须选择加入负载均衡,Vertica 才能将连接负载均衡策略应用到连接。例如,必须将 -C 标记传递给 vsql 命令,才能对您的交互式会话实现负载均衡。

调度程序使用 Java JDBC 库连接到 Vertica。要让调度程序选择加入负载均衡,必须将 JDBC 库的 ConnectionLoadBalance 选项设置为 1。有关详细信息,请参阅JDBC 中的负载均衡

使用 vkconfig 脚本的 --jdbc-opt 选项,或者将 jdbc-opt 选项添加到配置文件中以设置 ConnectionLoadBalance 选项。例如,要使用名为 weblog.conf 的配置文件从命令行启动调度程序,请使用以下命令:

$ nohup vkconfig launch --conf weblog.conf --jdbc-opt ConnectionLoadBalance=1 >/dev/null 2>&1 &

要永久启用负载均衡,可以将负载均衡选项添加到配置文件中。以下示例显示了设置调度程序内配置为使用连接负载均衡的示例中的 weblog.conf 文件。

username=dbadmin
password=mypassword
dbhost=10.20.110.21
dbport=5433
config-schema=weblog_sched
jdbc-opt=ConnectionLoadBalance=1

可通过查询 SESSIONS 表来检查是否正在对调度程序的连接进行负载均衡:

=> SELECT node_name, user_name, client_label FROM V_MONITOR.SESSIONS;
    node_name     | user_name |               client_label
------------------+-----------+-------------------------------------------
 v_vmart_node0001 | dbadmin   | vkstream_weblog_sched_leader_persistent_4
 v_vmart_node0001 | dbadmin   |
 v_vmart_node0002 | dbadmin   | vkstream_weblog_sched_lane-worker-0_0_8
 v_vmart_node0003 | dbadmin   | vkstream_weblog_sched_VDBLogger_0_9
(4 rows)

在 client_labels 列中,调度程序的连接具有以 vkstream 开头的标签(没有客户端标签的行是交互式会话)。可以看到调度程序打开的三个连接都进入了不同的节点。

2.5 - 使用偏移量限制加载

Kafka 会维护用户可配置的消息积压。默认情况下,新创建的调度程序读取 Kafka 主题中的所有消息,包括积压的所有消息,而不仅仅是调度程序启动后流出的消息。通常,您想要的就是这样。

然而,在某些情况下,您可能只想将源的一部分流传输到表中。例如,假定您想要分析您的电子商务站点在特定日期和时间的 Web 流量。然而,您的 Kafka 主题包含比您想要分析的时间更早的 Web 访问记录。在这种情况下,可以使用偏移量,将需要的数据流传输到 Vertica 进行分析。

另一个常见的用例是当您已经从 Kafka 手动加载了一些数据时(请参阅手动使用来自 Kafka 的数据)。现在,您希望流传输新到达的所有数据。默认情况下,调度程序会重新加载之前加载的所有数据(假定仍然可以从 Kafka 获得)。此时,可以使用偏移量来告诉调度程序从手动数据加载停止的位置开始自动加载数据。

配置调度程序从某个偏移量开始流传输

vkconfig 脚本的微批处理工具有一个 --offset 选项,使用它可以指定希望调度程序开始加载的源中消息的索引。此选项接受逗号分隔的索引值列表。除非使用 --partition 选项,否则必须为源中的每个分区提供一个索引值。此选项可用于选择应用偏移量的分区。在微批处理中设置偏移量时,调度程序不能运行。

如果微批处理定义了多个群集,请使用 --cluster 选项选择偏移量选项适用于哪个群集。类似地,如果微批处理有多个源,则必须使用 --source 选项选择一个。

例如,假定您只想从一个名为 web_hits 的源加载最后 1000 条消息。为了方便起见,假定源只包含单个分区,而微批处理只定义了单个群集和单个主题。

第一个任务是确定流结尾的当前偏移量。可以在一个 Kafka 节点上通过调用 GetOffsetShell 类执行此操作,将时间参数设置为 -1(主题的结尾):

$ path to kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
                                          --broker-list kafka01:9092,kafka03:9092 --time -1 \
                                          --topic web_hits

{metadata.broker.list=kafka01:9092,kafka03:9092, request.timeout.ms=1000,
 client.id=GetOffsetShell, security.protocol=PLAINTEXT}
web_hits:0:8932

还可以使用 GetOffsetShell 查找流中出现在时间戳之前的偏移量。

在上面的示例中,web_hits 主题的单个分区的结尾偏移量为 8932。如果我们希望加载来自源的最后 1000 条消息,我们需要将微批处理的偏移量设置为 8932 - 1001 或 7931。

计算偏移量后,即可在微批处理配置中设置了。下面的例子:

  • 关闭调度程序,其配置信息存储在 weblog.conf 文件中。

  • 使用微批处理实用程序设置起始偏移量。

  • 重新启动调度程序。

$ vkconfig shutdown --conf weblog.conf
$ vkconfig microbatch --microbatch weblog --update --conf weblog.conf --offset 7931
$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &

如果目标表为空,或者在调度程序启动之前被截断,那么表中将有 1000 行(直到通过源流传输更多的消息):

=> select count(*) from web_hits;
 count
-------
  1000
(1 row)

2.6 - 在 Vertica 升级后更新调度程序

调度程序仅与创建它的 Vertica 版本兼容。在 Vertica 版本之间,调度程序的配置架构或调度程序调用的 UDx 函数可能会发生更改。在升级 Vertica 后,必须更新调度程序才能收到这些更改。

当您将 Vertica 升级到新的主要版本或服务包时,使用 vkconfig 调度程序工具的 --upgrade 选项来更新调度程序。如果不更新调度程序,则在尝试启动它时将收到一条错误消息。例如:

$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &
com.vertica.solutions.kafka.exception.FatalException: Configured scheduler schema and current
scheduler configuration schema version do not match. Upgrade configuration by running:
vkconfig scheduler --upgrade
    at com.vertica.solutions.kafka.scheduler.StreamCoordinator.assertVersion(StreamCoordinator.java:64)
    at com.vertica.solutions.kafka.scheduler.StreamCoordinator.run(StreamCoordinator.java:125)
    at com.vertica.solutions.kafka.Launcher.run(Launcher.java:205)
    at com.vertica.solutions.kafka.Launcher.main(Launcher.java:258)
Scheduler instance failed. Check log file. Check log file.
$ vkconfig scheduler --upgrade --conf weblog.conf
Checking if UPGRADE necessary...
UPGRADE required, running UPGRADE...
UPGRADE completed successfully, now the scheduler configuration schema version is v8.1.1
$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &
                   .  .  .

3 - 监控消息使用情况

可以通过多种方式监控来自 Kafka 的数据流式传输的进度:

  • 监控 Vertica 向其报告进度的使用者组。如果要用于监控数据加载情况的工具能够与 Kafka 协调使用,那么这种技术最好。

  • 使用 vkconfig 工具中内置的监控 API。这些 API 以 JSON 格式报告流式传输调度程序的配置和使用情况。如果您正在开发自己的监控脚本,或者您的监控工具可以使用 JSON 格式的状态信息,这些 API 会非常有用。

3.1 - 通过使用者组监控 Vertica 消息使用情况

Apache Kafka 具有名为使用者组的功能,可帮助在多组使用者之间分配消息使用负载。采用使用者组时,Kafka 会根据组中的使用者数量来平均分配消息。使用者会向 Kafka 代理报告已成功读取的消息。此报告可帮助 Kafka 管理主题分区中的消息偏移量,以便不会向组中的任何使用者发送两次相同的消息。

在管理负载分配或防止重复加载消息方面,Vertica 不依赖于 Kafka 的使用者组。流式传输作业调度程序会自行管理主题分区偏移量。

即使 Vertica 不需要使用者组来管理偏移量,它也会向 Kafka 代理报告已使用的消息。借助此功能,您可以在 Vertica 群集加载消息时使用第三方工具监控其进度。默认情况下,Vertica 会向名为 vertica-databaseName(其中 databaseName 是 Vertica 数据库的名称)的使用者组报告其进度。在定义调度程序时或手动加载数据期间,可以更改 Vertica 向其报告进度的使用者组的名称。在加载数据时,第三方工具可以让 Kafka 代理监控 Vertica 群集的进度。

例如,可以使用 Kafka 的 kafka-consumer-groups.sh 脚本(位于 Kafka 安装的 bin 目录中)查看 Vertica 使用者组的状态。以下示例演示了如何列出 Kafka 群集中定义的可用使用者组并显示 Vertica 使用者组的详细信息:

$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

vertica-vmart
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
   --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'vertica-vmart' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
web_hits                       0          24500           30000           5500       -                                                 -                              -

从输出中,可以看到 Vertica 向 vertica-vmart 使用者组报告消息使用情况。此组是 Vertica 加载示例 VMart 数据库时的默认使用者组。第二个命令列出了 vertica-vmart 使用者组正在使用的主题。可以看到 Vertica 群集已读取主题唯一分区中的 24500 条消息(总共 30000 条)。稍后,运行相同的命令将显示 Vertica 群集的进度:

$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
    --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'vertica-vmart' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
web_hits                       0          30000           30000           0          -

更改 Vertica 向其报告进度的使用者组

可以更改 Vertica 在使用消息时向其报告进度的使用者组。

使用调度程序自动加载时进行更改

使用调度程序时,可通过将 --consumer-group-id 实参设为 vkconfig 脚本的调度程序微批处理实用程序来设置使用者组。例如,假设您希望设置调度程序中显示的示例调度程序向名为 vertica-database 的使用者组报告其使用情况。那么,可以使用以下命令:

$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update \
    --conf weblog.conf --microbatch weblog --consumer-group-id vertica-database

当调度程序开始加载数据时,它将开始更新新的使用者组。使用 kafka-consumer-groups.sh 可以在 Kafka 节点上看到此新使用者组。

使用 --list 选项可返回使用者组:

$ /path/to/kafka/bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

vertica-database
vertica-vmart

使用 --describe--group 选项可返回有关特定使用者组的详细信息:

$ /path/to/kafka/bin/kafka-consumer-groups.sh --describe --group vertica-database \
                                          --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'vertica-database' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
web_hits                       0          30300           30300           0          -                                                 -                              -

手动加载时进行更改

要在手动加载数据时更改使用者组,请使用 KafkaSource 函数的 group_id 参数:

=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
                                    brokers='kafka01.example.com:9092',
                                    stop_on_eof=True,
                                    group_id='vertica_database')
                 PARSER KafkaJSONParser();
 Rows Loaded
-------------
       50000
(1 row)

加载消息时采用使用者组偏移量

可以选择让调度程序、手动加载或自定义加载脚本从使用者组的偏移量处开始加载消息。要从存储在使用者组中的最后偏移量处加载消息,请使用特殊的偏移量 -3。

使用调度程序自动加载的示例

要指示调度程序从使用者组已保存的偏移量处加载消息,请使用 vkconfig 脚本微批处理工具的 --offset 实参。

  1. 停止使用 shutdown 命令的调度程序以及用于创建调度程序的配置文件:

    $ /opt/vertica/packages/kafka/bin/vkconfig microbatch shutdown --conf weblog.conf
    
  2. 将微批处理 --offset 选项设为 -3:

    $ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update --conf weblog.conf --microbatch weblog --offset -3
    

这会将调度程序从其读取的所有主题分区的偏移量设置为 -3。调度程序将采用使用者组已保存的偏移量开始下一次加载,并且所有后续加载均将使用保存在 stream_microbatch_history 中的偏移量。

手动加载示例

以下示例将从 web_hits 主题加载消息,该主题具有包含 51,000 条消息的分区。有关使用 KafkaSource 进行手动加载的详细信息,请参阅手动使用来自 Kafka 的数据

  1. 第一个 COPY 语句将创建名为 vertica_manual 的使用者组,并从 web_hits 主题中的第一个分区加载前 50,000 条消息:

    => COPY web_hits
       SOURCE KafkaSource(stream='web_hits|0|0|50000',
                                  brokers='kafka01.example.com:9092',
                                  stop_on_eof=True,
                                  group_id='vertica_manual')
       PARSER KafkaJSONParser()
       REJECTED DATA AS TABLE public.web_hits_rejections;
     Rows Loaded
    -------------
           50000
    (1 row)
    
  2. 下一个 COPY 语句将传递 -3 作为 start_offset 流参数,以便从使用者组已保存的偏移量处进行加载:

    => COPY web_hits
       SOURCE KafkaSource(stream='web_hits|0|-3',
                                  brokers='kafka01.example.com:9092',
                                  stop_on_eof=True,
                                  group_id='vertica_manual')
       PARSER KafkaJSONParser()
       REJECTED DATA AS TABLE public.web_hits_rejections;
     Rows Loaded
    -------------
            1000
    (1 row)
    

禁用使用者组报告

默认情况下,Vertica 会向 Kafka 报告其使用的消息的偏移量。如果没有专门为 Vertica 配置使用者组,它仍会向名为 vertica_database-name(其中 database- name 是 Vertica 当前正在运行的数据库的名称)的使用者组报告其偏移量。

如果要完全禁止 Vertica 向 Kafka 报告其使用情况,可以将使用者组设为空字符串或 NULL。例如:

=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
                                    brokers='kafka01.example.com:9092',
                                    stop_on_eof=True,
                                    group_id=NULL)
                 PARSER KafkaJsonParser();
 Rows Loaded
-------------
       60000
(1 row)

3.2 - 从 vkconfig 获取配置和统计信息

vkconfig 工具有两个可帮助检查调度程序的配置并监控数据加载情况的功能:

  • 配置调度程序(调度程序、群集、源、目标、加载规范和微批处理)的 vkconfig 工具具有 --read 实参,可让这些工具在调度程序中输出其当前设置。

  • vkconfig 统计信息工具可用于获取有关微批处理的统计信息。根据日期和时间范围、群集、分区及其他条件,可以筛选微批处理记录。

上述两个功能均以 JSON 格式输出数据。可以使用能够使用 JSON 数据的第三方工具或编写自己的脚本来处理配置和统计信息数据。

此外,还可以通过查询调度程序架构中的配置表来访问这些 vkconfig 选项提供的数据。但是,您可能会发现这些选项变得更加易于使用,因为它们不需要您连接到 Vertica 数据库。

获取配置信息

--read 选项传递到 vkconfig 的配置工具即可获取该工具可以设置的选项的当前设置。此输出采用 JSON 格式。以下示例演示了如何为 weblog.conf 配置文件中定义的调度程序从调度程序和群集工具中获取配置信息:

$ vkconfig scheduler --read --conf weblog.conf
{"version":"v9.2.0", "frame_duration":"00:00:10", "resource_pool":"weblog_pool",
 "config_refresh":"00:05:00", "new_source_policy":"FAIR",
 "pushback_policy":"LINEAR", "pushback_max_count":5, "auto_sync":true,
 "consumer_group_id":null}

$ vkconfig cluster --read --conf weblog.conf
{"cluster":"kafka_weblog", "hosts":"kafak01.example.com:9092,kafka02.example.com:9092"}

--read 选项将列出该工具在调度程序架构中创建的所有值。例如,如果已在调度程序中定义多个目标,则 --read 选项会列出所有目标。

$ vkconfig target --list --conf weblog.conf
{"target_schema":"public", "target_table":"health_data"}
{"target_schema":"public", "target_table":"iot_data"}
{"target_schema":"public", "target_table":"web_hits"}

可以使用 vkconfig 工具接受的其他实参来筛选 --read 选项输出。例如,在群集工具中,可以使用 --host 实参将输出限制为仅显示包含特定主机的群集。这些实参支持 LIKE 谓词通配符,因此可以匹配部分值。有关使用通配符的详细信息,请参阅 LIKE 谓词

以下示例演示了如何使用 --host 实参来筛选群集工具的 --read 选项的输出。第一次调用显示未经筛选的输出。第二次调用可筛选输出,以仅显示以“kafka”开头的群集:

$ vkconfig cluster --read --conf weblog.conf
{"cluster":"some_cluster", "hosts":"host01.example.com"}
{"cluster":"iot_cluster",
 "hosts":"kafka-iot01.example.com:9092,kafka-iot02.example.com:9092"}
{"cluster":"weblog",
 "hosts":"web01.example.com.com:9092,web02.example.com:9092"}
{"cluster":"streamcluster1",
 "hosts":"kafka-a-01.example.com:9092,kafka-a-02.example.com:9092"}
{"cluster":"test_cluster",
 "hosts":"test01.example.com:9092,test02.example.com:9092"}

$ vkconfig cluster --read --conf weblog.conf --hosts kafka%
{"cluster":"iot_cluster",
 "hosts":"kafka-iot01.example.com:9092,kafka-iot02.example.com:9092"}
{"cluster":"streamcluster1",
 "hosts":"kafka-a-01.example.com:9092,kafka-a-02.example.com:9092"}

有关详细信息,请参阅群集工具选项加载规范工具选项微批处理工具选项调度程序工具选项目标工具选项源工具选项

获取流式传输数据加载统计信息

vkconfig 脚本的统计信息工具可用于查看调度程序微批处理的历史记录。可以使用以下条件的任意组合来筛选结果:

  • 微批处理的名称

  • 作为数据加载源的 Kafka 群集

  • 主题的名称

  • 主题内的分区

  • 数据加载所针对的 Vertica 架构和表

  • 日期和时间范围

  • 最新的微批处理

有关此工具中提供的所有选项,请参阅统计信息工具选项

以下示例将获取调度程序运行的最后两个微批处理:

$ vkconfig statistics --last 2 --conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
 "source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
 "start_offset":73300, "end_offset":73399, "end_reason":"END_OF_STREAM",
 "end_reason_message":null, "partition_bytes":19588, "partition_messages":100,
 "timeslice":"00:00:09.807000", "batch_start":"2018-11-02 13:22:07.825295",
 "batch_end":"2018-11-02 13:22:08.135299", "source_duration":"00:00:00.219619",
 "consecutive_error_count":null, "transaction_id":45035996273976123,
 "frame_start":"2018-11-02 13:22:07.601", "frame_end":null}
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
 "source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
 "start_offset":73200, "end_offset":73299, "end_reason":"END_OF_STREAM",
 "end_reason_message":null, "partition_bytes":19781, "partition_messages":100,
 "timeslice":"00:00:09.561000", "batch_start":"2018-11-02 13:21:58.044698",
 "batch_end":"2018-11-02 13:21:58.335431", "source_duration":"00:00:00.214868",
 "consecutive_error_count":null, "transaction_id":45035996273976095,
 "frame_start":"2018-11-02 13:21:57.561", "frame_end":null}

以下示例将从名为 web_hits 的源获取介于 2018 年 11 月 2 日 13:21:00 到 13:21:20 之间的微批处理:

$ vkconfig statistics --source "web_hits" --from-timestamp \
           "2018-11-02 13:21:00" --to-timestamp "2018-11-02 13:21:20"  \
           --conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
 "source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
 "start_offset":72800, "end_offset":72899, "end_reason":"END_OF_STREAM",
 "end_reason_message":null, "partition_bytes":19989, "partition_messages":100,
 "timeslice":"00:00:09.778000", "batch_start":"2018-11-02 13:21:17.581606",
 "batch_end":"2018-11-02 13:21:18.850705", "source_duration":"00:00:01.215751",
 "consecutive_error_count":null, "transaction_id":45035996273975997,
 "frame_start":"2018-11-02 13:21:17.34", "frame_end":null}
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
 "source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
 "start_offset":72700, "end_offset":72799, "end_reason":"END_OF_STREAM",
 "end_reason_message":null, "partition_bytes":19640, "partition_messages":100,
 "timeslice":"00:00:09.857000", "batch_start":"2018-11-02 13:21:07.470834",
 "batch_end":"2018-11-02 13:21:08.737255", "source_duration":"00:00:01.218932",
 "consecutive_error_count":null, "transaction_id":45035996273975978,
 "frame_start":"2018-11-02 13:21:07.309", "frame_end":null}

有关使用此工具的更多示例,请参阅统计信息工具选项

4 - 解析自定义格式

要处理 Kafka 数据流,解析器必须识别每条消息之间的边界。Vertica 提供的 Kafka 解析器可以识别 AvroJSON原始数据格式的边界,但您的数据流可能会使用自定义格式。为解析自定义格式,Vertica 提供了筛选器,可在数据流到达解析器之前在数据流中插入边界信息。

Kafka 筛选器

Vertica 提供以下筛选器:

  • KafkaInsertDelimiters:在数据流中的每条消息之间插入用户指定的分隔符。该分隔符可以包含任意字符,具有任意长度。此解析器使用以下语法:

    KafkaInsertDelimiters(delimiter = 'delimiter')
  • KafkaInsertLengths:在消息的开头插入消息长度(以字节为单位)。Vertica 按 big-endian 网络字节顺序将长度写入为 4 字节 uint32 值。例如,100 字节的消息会加上 0x00000064 前缀。此解析器使用以下语法:

    KafkaInsertLengths()

除了使用其中一个 Kafka 筛选器之外,还可以在单个 COPY 语句中添加一个或多个用户定义的筛选器。请以逗号分隔的列表指定多个筛选器,并先列出 Vertica 筛选器。如果使用非 Kafka 解析器,则必须至少使用一个筛选器为解析器准备数据流,否则解析器将失败并返回错误。

示例

以下 COPY 语句将从名为 iot-data 的主题中的两个分区加载逗号分隔值。处理完两个分区中的所有消息后,加载将退出。KafkaInsertDelimiters 筛选器将在 Kafka 消息之间插入换行符,以将其转换为传统的数据行。该语句将使用标准 COPY 解析器以逗号分隔 CSV 值:

=> COPY kafka_iot SOURCE KafkaSource(stream='iot-data|0|-2,iot-data|1|-2',
                                     brokers='kafka01:9092',
                                     stop_on_eof=True)
                  FILTER KafkaInsertDelimiters(delimiter = E'\n')
                  DELIMITER ',';
 Rows Loaded
-------------
        3430
(1 row)