这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

Apache Kafka 集成

Vertica 提供了与开源分布式实时流媒体平台 Apache Kafka 集成的高性能机制。由于 Vertica 既可以使用 Kafka 中的数据,也可以为 Kafka 生成数据,因此可以在执行自动分析工作流的过程中使用 Vertica:Vertica 可以从 Kafka 中检索数据,对数据进行分析,然后将结果发回 Kafka 供其他应用程序使用。

先决条件

架构和概述

Vertica 和 Kafka 的集成提供了以下功能:

  • 一个 UDx 库,其中包含将数据从 Kafka 主题加载到 Vertica 并对数据进行解析的函数

  • 使用 UDL 库的作业调度程序可持续消耗来自 Kafka 的数据,并利用“只执行一次”语义

  • 基于推送的通知程序,它将数据收集器消息从 Vertica 发送到 Kafka

  • 一个 KafkaExport 函数,它将 Vertica 数据发送到 Kafka

Vertica 作为 Kafka 的使用者

Kafka 使用者读取由其他数据流写入 Kafka 的消息。由于 Vertica 可以读取来自 Kafka 的消息,因此您可以存储并分析来自任何向 Kafka 发送数据的应用程序中的数据,而无需将每个单独的应用程序配置为连接到 Vertica。Vertica 提供了可用于自动或手动使用从 Kafka 加载的数据的工具。

手动加载

通过直接执行 COPY 语句从 Kafka 手动加载有限数量的数据。如果您想要对一组消息进行分析、测试或执行其他处理,这会非常有用。

有关详细信息,请参阅使用来自 Kafka 的数据

自动加载

使用作业调度程序从 Kafka 自动加载数据。调度程序不断加载数据,并确保每条 Kafka 消息只加载一次。

必须在运行调度程序的每个 Vertica 节点上安装 Java 8。有关详细信息,请参阅使用调度程序自动使用来自 Kafka 的数据

Vertica 作为 Kafka 的生产者

Kafka 生产者将数据发送给 Kafka,数据随后可供 Kafka 使用者进行处理。可以将以下类型的数据发送到 Kafka:

  • Vertica 分析结果。使用 KafkaExport 导出 Vertica 表和查询。

  • 数据收集器表中的运行状况和性能数据。创建基于推送的通知程序,发送此数据供第三方监控工具使用。

  • 临时消息。使用 NOTIFY 表示存储过程等任务已完成。

有关详细信息,请参阅为 Kafka 生成数据

1 - 数据流式传输集成术语

Vertica 使用以下术语来描述其流式传输功能。这些是通用术语,可能与每个特定流式传输平台的术语不同。

术语

主机
数据流式传输服务器。
通用类别的消息源,可将消息流式传输到相同的 Vertica 目标表。在 Apache Kafka 中,源称为主题。
分区
数据流式传输中的并行度单位。数据流式传输将源分为多个分区,这些分区可供使用者(例如 Vertica 数据库)并行使用。在一个分区内,消息通常按时间顺序排列。
偏移量
分区内的索引。此索引是指有序消息队列中的位置,而不是不透明字节流中的索引。
消息
数据流式传输中的数据单位。数据通常为 JSON 或 Avro 格式。消息会作为行加载到 Vertica 表中,并通过其源、分区和偏移量进行唯一标识。

数据加载程序术语

调度程序
一种外部工具,用于对将数据从流式数据传输源加载到 Vertica 进行调度。
微批处理
微批处理表示从流式数据传输源加载的数据的单个片段。它包含调度程序在执行从流式数据传输源到 Vertica 的加载时所需的所有信息。
时间范围
调度程序执行微批处理以加载数据的时间窗口。此窗口控制调度程序作为微批处理一部分运行每个 COPY 语句的持续时间。在该时间范围,调度程序为来自每个源的活动微批处理提供加载数据的机会。它根据先前微批处理的历史记录优先考虑需要更多时间来加载数据的微批处理。
由源和分区标识的消息源。

偏移量可唯一标识特定源-分区流内的位置。

通道
作业调度程序实例中的一个线程,用来发布微批处理来执行加载操作。

可用通道数基于作业调度程序资源池的 PlannedConcurrency。多通道允许调度程序在某个时间范围内为不同的源并行运行微型批处理。

2 - 配置 Vertica 和 Kafka

Vertica 和 Kafka 都有可用来优化流式传输数据加载的设置。此部分中的主题将介绍这些设置。

2.1 - Kafka 和 Vertica 配置设置

可以微调 Vertica 和 Kafka 的配置设置,从而优化性能。

Vertica 和 Kafka 集成使用的是 rdkafka 版本 0.11.6。除非另有说明,否则以下配置设置使用默认的 librdkafka 配置属性值。有关如何覆盖默认值的说明,请参阅直接设置 Kafka 库选项

Vertica 生产者设置

以下设置更改 Vertica 如何使用 KafkaExport 函数和通知程序为 Kafka 生成数据。

queue.buffering.max.messages
Vertica 生产者队列的大小。如果 Vertica 以过快的速度生成过多消息,那么队列可能会填满,从而导致丢弃消息。增加此值会消耗更多内存,但是会减少消息丢失的机会。

默认值

  • KafkaExport:1000

  • 通知程序:10000

queue.buffering.max.ms
Vertica 刷新生产者消息队列的频率。虽然较低的值会减少延迟,但以吞吐量为代价。较高的值会增加吞吐量,但会导致生产者队列(由 queue.buffer.max.messages 设置)更频繁地填满,从而导致丢弃消息。

默认值: 100 ms

message.max.bytes
Kafka 协议请求消息批次的最大大小。此值在源、代理和生产者上都应该相同。
message.send.max.retries
生产者尝试将消息传递给代理的次数。较高的值会增加成功机会。
retry.backoff.ms
Vertica 在重新发送失败的消息前等待的间隔。
request.required.acks
Kafka 在认为消息传递成功之前需要的代理副本确认数。需要确认会增加延迟。移除确认会增加消息丢失的风险。
request.timeout.ms
生产者等待代理响应的间隔。代理响应时间受服务器负载和所需消息确认数的影响。
较高的值会增加延迟。
compression.type
在将数据发送给代理之前用于编码数据的压缩算法。压缩有助于减少 Vertica 生产者的网络占用空间并提高磁盘利用率。Vertica 支持 gzipsnappy

Kafka 代理设置

Kafka 代理接收来自生产者的消息,并在 Kafka 使用者之间分配它们。在代理本身上配置这些设置。这些设置独立于生产者和使用者设置。有关 Apache Kafka 代理设置的详细信息,请参考 Apache Kafka 文档

message.max.bytes
Kafka 协议请求消息批次的最大大小。此值在源、代理和生产者上应该相同。
num.io.threads
代理用于接收和处理请求的网络线程数。更多的线程可以提高并发性。
num.network.threads
代理用于接受网络请求的网络线程数。更多的线程可以提高并发性。

Vertica 使用者设置

以下设置更改 Vertica 在使用来自 Kafka 的数据时的行为方式。当直接执行 COPY 语句时,可以在 KafkaSource UDL 上使用 kafka_conf 参数设置此值。对于调度程序,请使用调度程序工具中的 --message_max_bytes 设置。

message.max.bytes
Kafka 协议请求消息批次的最大大小。请将该值设置为足够高的值,以防止提取消息批次的开销干扰加载数据。对于新创建的加载规范,默认为 24MB。

2.2 - 直接设置 Kafka 库选项

Vertica 依赖于开源的 rdkafka 库与 Apache Kafka 通信。此库包含用于控制 Vertica 和 Kafka 如何交互的许多选项。可以通过 vkconfig 实用程序和 Kafka 集成函数(如 KafkaSource)中的设置来设置最常见的 rdkafka 库选项。

有些 rdkafka 设置不能直接从 Vertica 中进行设置。一般情况下,您不需要更改它们。但是,如果您发现需要设置一个不能直接从 Vertica 进行的特定 rdkafka 设置,可以将相关选项通过 kafka_conf 选项直接传递到 rdkafka 库。

当使用调度程序从 Kafka 加载数据时,支持 kafka_conf 实参。可以通过以下方式设置这些值(按照优先级从低到高的顺序列出):

  • 在运行 vkconfig 实用程序的主机上设置的 Linux 环境变量 VERTICA_RDKAFKA_CONF

  • 在运行 vkconfig 实用程序的主机上设置的 Linux 环境变量 VERTICA_RDKAFKA_CONF_KAFKA_CLUSTER。该变量名称的 KAFKA_CLUSTER 部分是您已经使用 vkconfig 的群集实用程序定义的 Kafka 群集的名称。该环境变量中的设置仅影响在 KAFKA_CLUSTER 中指定的特定 Kafka 群集。

  • vkconfig 实用程序的 --kafka_conf 选项。可以在 clustersourcelaunch 和同步工具中设置此选项。请注意,该设置仅适用于每个 vkconfig 实用程序调用,而不会延续到其他 vkconfig 实用程序调用。例如,如果您需要为 cluster 和 source 工具提供一个选项,那么必须同时为它们提供 kafka_conf 选项。

所有这些选项都是级联的,因此,如果将使用 --kafka_conf 实参的选项设置为群集工具,在环境变量中设置的相同选项会被覆盖。

当直接调用 KafkaExport、KafkaSource 和多个其他 Kafka 集成函数时,也可以直接设置 rdkafka 选项。这些函数接受名为 kafka_conf 的参数。

kafka_conf 选项设置

kafka_conf vkconfig 选项接受一个 JSON 对象,设置如下格式:

  • 一个或多个选项/值对:

    --kafka_conf '{"option1":value1[, "option2":value2...]}'
    
  • 带有多个值的单个选项:

    --kafka_conf '{"option1":"value1[;value2...]"}'
    

有关 rdkafka 库支持的配置选项列表,请参阅 GitHub 上的 rdkafka 项目

示例

以下示例演示了如何在使用 KafkaSource 手动加载消息时禁用 rdkafka api.version.request 的选项。当访问运行 0.9 或更早版本的 Kafka 群集时,应该禁用此选项。有关详细信息,请参阅为 Apache Kafka 0.9 及更早版本配置 Vertica

=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
                                           brokers='kafka-01.example.com:9092',
                                           stop_on_eof=True,
                                           kafka_conf='{"api.version.request":false}')
        PARSER KafkaJSONParser();
 Rows Loaded
-------------
        5000
(1 row)

以下示例演示了如何在调用群集工具时使用 JSON 对象设置两个选项。它禁用 api.version.request 选项,并使用 check.crcs 选项启用来自 Kafka 的消息的 CRC 检查:

$ vkconfig cluster --create --cluster StreamCluster1 \
                   --hosts kafka01.example.com:9092,kafka02.example.com:9092 \
                   --conf myscheduler.config \
                   --kafka_conf '{"api.version.request":false, "check.crcs":true}'

以下示例演示了如何使用环境变量设置相同的选项:

$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false, "check.crcs":true}'
$ vkconfig cluster --create --cluster StreamCluster1 \
                   --hosts kafka01.example.com:9092,kafka02.example.com:9092 \
                   --conf myscheduler.config

2.3 - 为 Apache Kafka 0.9 及更早版本配置 Vertica

Apache Kafka 0.10 版本引入了一个新功能,它让使用者能够决定 Kafka 代理支持哪个版本的 Kafka API。支持该功能的使用者会向 Kafka 代理发送一个初始 API 版本查询,以确定与它们通信时使用的 API 版本。运行 0.9.0 或更早版本的 Kafka 代理无法响应此 API 查询。如果使用者在超时时间(默认设置为 10 秒)内没有收到来自 Kafka 代理的回复,则使用者可以假定 Kafka 代理运行的是 Kafka 0.9.0 或更早版本。

Vertica 与 Kafka 的集成从 9.1.1 版本开始支持此 API 查询功能。如果将 Vertica 连接到运行 0.9.0 或更早版本的 Kafka 群集,此 API 检查可能会导致问题。您可能会注意到从 Kafka 加载消息的性能较差,并且可能会遇到错误,因为 10 秒的 API 请求超时会导致部分 Kafka 集成功能超时并报告错误。

例如,如果在 Kafka 0.9 群集上通过运行 vkconfig 源实用程序来配置源,则可能会出现以下错误:

$ vkconfig source --create --conf weblog.conf --cluster kafka_weblog --source web_hits
Exception in thread "main" com.vertica.solutions.kafka.exception.ConfigurationException:
ERROR: [[Vertica][VJDBC](5861) ERROR: Error calling processPartition() in
User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]]
at com.vertica.solutions.kafka.model.StreamSource.validateConfiguration(StreamSource.java:184)
at com.vertica.solutions.kafka.model.StreamSource.setFromMapAndValidate(StreamSource.java:130)
at com.vertica.solutions.kafka.model.StreamModel.<init>(StreamModel.java:89)
at com.vertica.solutions.kafka.model.StreamSource.<init>(StreamSource.java:39)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:53)
at com.vertica.solutions.kafka.cli.SourceCLI.getNewModel(SourceCLI.java:15)
at com.vertica.solutions.kafka.cli.CLI.run(CLI.java:56)
at com.vertica.solutions.kafka.cli.CLI._main(CLI.java:132)
at com.vertica.solutions.kafka.cli.SourceCLI.main(SourceCLI.java:25)
Caused by: java.sql.SQLNonTransientException: [Vertica][VJDBC](5861)
ERROR: Error calling processPartition() in User Function KafkaListTopics at [/data/build-centos6/qb/buildagent/workspace/jenkins2/PrimaryBuilds/build_master/build/udx/supported/kafka/KafkaUtil.cpp:173],
error code: 0, message: Error getting metadata: [Local: Broker transport failure]
at com.vertica.util.ServerErrorData.buildException(Unknown Source)
                           . . .

使用流作业调度程序时禁用 API 版本请求

为了避免出现这些问题,必须禁用 Vertica 用于与 Kafka 通信的 rdkafka 库中的 API 版本请求功能。要想在使用调度程序时禁用此设置,最简单的方法是在运行 vkconfig 脚本的主机上设置一个 Linux 环境变量。使用该环境变量可以确保对 vkconfig 的每次调用都包含禁用 API 版本请求的设置。vkconfig 脚本检查以下两个变量:

  • VERTICA_RDKAFKA_CONF 适用于所有与主机上运行的调度程序通信的 Kafka 群集。

  • VERTICA_RDKAFKA_CONF_CLUSTER_NAME 只适用于名为 CLUSTER_NAME 的 Kafka 群集。如果您的调度程序与多个 Kafka 群集通信,其中一些没有运行 0.9 或更早版本,那么请使用此变量。

如果同时设置这两个变量,则特定于群集的变量优先。如果调度程序所连接到的大多数群集运行的是 Kafka 0.9 或更早版本,但少数运行的是 0.10 或更高版本,那么此功能非常有用。在这种情况下,可以使用 VERTICA_RDKAFKA_CONF 变量禁用大多数群集的 API 版本检查,并使用 VERTICA_RDKAFKA_CONF_CLUSTER_NAME 为特定群集重新启用检查。

如果只有少数 Kafka 群集运行 0.9 或更早版本,可以为它们设置特定于群集的变量,并为大多数群集保留默认值。

环境变量的内容是一个 JSON 对象,它通知 rdkafka 库是否查询 Kafka 群集,看群集中是否存在该对象支持的 API 版本:

'{"api.version.request":false}'

要在 BASH 中设置环境变量,请使用 export 命令:

$ export VERTICA_RDKAFKA_CONF='{"api.version.request":false}'

如果想让该设置仅影响名为 kafka_weblog 的群集,则使用以下命令:

$ export VERTICA_RDKAFKA_CONF_kafka_weblog='{"api.version.request":false}'

可以将该命令添加到任何普通用户环境配置文件(如 ~/.bash_profile)或系统范围的文件(如 /etc/profile)中。必须确保对所有可能运行 vkconfig 脚本的用户(包括由 init 等守护程序进程发出的任何调用)使用相同的设置。还可以在用于设置、配置或启动调度程序的任何脚本中直接包含此命令。

在直接加载消息时禁用 API 版本请求

使用 kafka_conf 参数直接调用 KafkaSource 加载消息时,可以禁用 API 请求:

=> CREATE FLEX TABLE iot_data();
CREATE TABLE
=> COPY public.iot_data SOURCE KafkaSource(stream='iot_json|0|-2',
                                           brokers='kafka-01.example.com:9092',
                                           stop_on_eof=True,
                                           kafka_conf='{"api.version.request":false}')
        PARSER KafkaJSONParser();
 Rows Loaded
-------------
        5000
(1 row)

另请参阅

2.4 - 在 Kafka 0.11 及更高版本中对 message.max.bytes 设置的更改

在 Kafka 0.10 及更早版本中,message.max.bytes 设置用来配置单个消息允许的最大大小。从 0.11 版开始,Kafka 开始按批次对消息进行分组。该版本将此设置的含义更改为一个消息批次允许的最大大小。

这一更改会对 Vertica 数据库加载 Kafka 0.11 及更高版本中消息的速度产生明显的影响。如果使用 Vertica 9.1.0 或更早版本创建流作业调度程序,您可能会发现其加载规范的 message.max.bytes 值设置得太低。升级调度程序不会自动调整此设置。

将 Vertica 与 Kafka 0.11 及更高版本结合使用时,为了防止吞吐量变慢,请手动调整调度程序加载规范中的 message.max.bytes 设置。Vertica 的测试表明,该设置的最佳值是 25165824 (24 × 1024 × 1024) 字节。

以下示例演示了如何更新现有调度程序的加载规范 weblog_load 中的 message.max.bytes 设置,该调度程序使用名为 weblog.conf 的配置文件进行定义:

$ /opt/vertica/packages/kafka/bin/vkconfig load-spec --update \
     --load-spec weblog_load --conf weblog.conf --message-max-bytes 25165824

3 - Vertica Eon 模式和 Kafka

当 Vertica 在 Eon 模式下运行时,可以使用 Vertica 与 Apache Kafka 的集成。对于在云环境中运行的 Eon 模式,请考虑为群集中的调度程序使用更长的时间范围持续时间。总体而言,云基础架构会导致延迟更久,尤其在存储层与计算层分开时更是如此。如果不考虑延迟变得更久,您可能会体验到更低的数据吞吐量,因为更多的时间范围时间因云环境造成的开销而浪费。此外,使用更长的时间范围还有助于防止 Vertica 在从 Kafka 加载数据时创建许多小型 ROS 容器。使用更长的时间范围所付出的代价是数据加载会出现更久的延迟。有关详细信息,请参阅选择时间范围持续时间

4 - 使用来自 Kafka 的数据

Kafka 使用者订阅一个或多个由 Kafka 群集管理的主题。每个主题都是一个数据流,一个表示为有序消息序列的无界数据集。Vertica 可以手动或自动使用 Kafka 主题对您的流式传输数据进行分析。

手动使用数据

通过调用 KafkaSource 函数和解析器的 COPY 语句手动使用来自 Kafka 的数据。当您希望执行以下操作时,手动加载会非常有帮助:

  • 使用 Kafka 中当前存在的消息填充表一次。

  • 分析一组特定的消息。可以选择从 Kafka 流中加载的数据子集。

  • 在把调度程序设置为连续将数据流式传输到 Vertica 之前,探索 Kafka 流中的数据。

  • 以调度程序无法实现的方式控制数据加载。例如,在从 Kafka 加载数据期间无法执行业务逻辑或自定义拒绝处理,因为调度程序在其事务期间不支持其他处理。相反,可以定期运行一个事务来执行用来从 Kafka 加载数据的 COPY 语句,然后执行其他处理。

有关详细示例,请参阅 手动使用来自 Kafka 的数据

自动使用数据

通过调度程序(一个在数据到达时加载数据的命令行工具)自动使用从 Kafka 到 Vertica 的流式传输数据。调度程序以微批处理(一个工作单元,在指定的持续时间内处理单个 Kafka 主题的分区)定义的分段加载数据。可以使用 vkconfig 工具管理调度程序配置和选项。

有关详细信息,请参阅使用调度程序自动使用来自 Kafka 的数据

监控使用情况

必须监控消息使用情况以确保 Kafka 和 Vertica 有效地通信。可以使用本机 Kafka 工具监控使用者组,或者可以使用 vkconfig 工具查看详细使用信息。

有关其他信息,请参阅监控消息使用情况

使用 Kafka 筛选器解析数据

您的数据流可能会对默认情况下无法由 Kafka 解析器函数解析的数据进行编码。使用 Kafka 筛选器来分隔流中的消息,以改善数据使用情况。

有关详细信息,请参阅解析自定义格式

4.1 - 手动使用来自 Kafka 的数据

可以使用 COPY 语句将流式传输数据从 Kafka 手动加载到 Vertica 中,就像可以从一个文件或其他源加载一组有限数据一样。与标准数据源不同,Kafka 数据以消息流的形式连续到达,您必须先对这些消息进行解析才能将它们加载到 Vertica 中。在 COPY 语句中使用 Kafka 函数准备数据流。

此示例以增量式构建一个 COPY 语句,它从名为 web_hits 的 Kafka 主题手动加载 JSON 编码数据。web_hits 主题流式传输网站请求的服务器日志。

有关将数据加载到 Vertica 中的信息,请参阅数据加载

创建目标表

要确定目标表架构,必须确定消息结构。下面是 web_hits 流的一个示例:

{"url": "list.jsp", "ip": "144.177.38.106", "date": "2017/05/02 20:56:00",
"user-agent": "Mozilla/5.0 (compatible; MSIE 6.0; Windows NT 6.0; Trident/5.1)"}
{"url": "search/wp-content.html", "ip": "215.141.172.28", "date": "2017/05/02 20:56:01",
"user-agent": "Opera/9.53.(Windows NT 5.2; sl-SI) Presto/2.9.161 Version/10.00"}

此主题流式传输 JSON 编码数据。由于 JSON 数据不一致,并且可能包含不可预测的增加值,因此请将该数据流存储在一个 Flex 表中。Flex 表动态接受数据中出现的其他字段。

以下语句创建一个名为 web_table 的 Flex 表来存储该数据流:

=> CREATE FLEX TABLE web_table();

在 COPY 语句的开头,添加 web_table 作为目标表:

COPY web_table

有关 Flex 表的详细信息,请参阅 Flex 表

定义 KafkaSource

COPY 语句的源始终是 KafkaSource。KafkaSource 接受有关数据流、Kafka 代理和其他处理选项的详细信息以连续加载数据,直到满足结束条件

流详细信息

stream 参数定义您希望从一个或多个主题分区中加载的数据段。每个 Kafka 主题都将其消息拆分为不同的分区以获得可扩展的吞吐量。Kafka 根据 Kafka 管理员设置的规则为每个主题保留一个消息待完成项。可以选择加载待完成项中的部分或全部消息,或者仅加载当前流式传输的消息。

对于每个分区,stream 参数要求以竖线 (|) 分隔的列表形式提供主题名称、主题分区和分区偏移量。(可选)可以提供一个结束偏移量作为停止从数据流加载的结束条件:

'stream='topic_name|partition|start_offset[|end_offset]'

要从 web_hits 主题的单个分区加载整个待完成项,请使用 SOURCE 关键字附加具有以下 stream 参数值的 KafkaSource:

COPY ...
SOURCE KafkaSource(stream='web_hits|0|-2', ...

在前面的示例中:

  • web_hits 是要从中加载数据的主题的名称。

  • 0 是要从中加载数据的主题分区。主题分区从 0 开始建立索引,web_hits 仅包含一个分区。

  • -2 加载整个待完成项。这是一个特殊的偏移量值,它通知 KafkaSource 以最早可用的消息偏移量开始加载。

加载多个分区

此示例虽然仅从一个分区加载,但是请务必了解如何在一个 COPY 语句中从多个分区加载。

要从同一主题中的其他分区(甚至是其他主题)加载,请提供逗号分隔的列表,其中包括主题名称、分区号和由竖线分隔的偏移量值。例如,以下 stream 实参从 web_hits 主题的分区 0 到分区 2 加载整个待完成项:

KafkaSource(stream='web_hits|0|-2,web_hits|1|-2,web_hits|2|-2'...

当在同一 COPY 语句中加载多个分区时,可以将 executionparallelism 参数设置为定义针对 COPY 语句创建的线程数。理想情况下,希望每个分区使用一个线程。可以选择不指定值,让 Vertica 根据资源池中的分区数和可用资源确定线程数。在此例中,只有一个分区,因此不需要额外的线程来加载数据。

添加 Kafka 代理

KafkaSource 需要 Kafka 群集中代理的主机名(或 IP 地址)和端口号。Kafka 代理是 Vertica 为了检索 Kafka 数据而访问的服务。在以下示例中,Kafka 群集有一个名为 kafka01.example.com 的代理,该代理在端口 9092 上运行。请将 brokers 参数和值附加到 COPY 语句:

COPY ...
SOURCE KafkaSource(stream='web_hits|0|-2',
                   brokers='kafka01.example.com:9092', ...

选择结束条件

因为数据从 Kafka 连续到达,所以从 Kafka 手动加载需要定义一个结束条件来指示何时停止加载数据。除了流详细信息中描述的结束偏移量,还可以选择:

  • 在设置的一段时间内复制尽可能多的数据。

  • 一直加载数据,直到在超时时间内没有新数据到达。

  • 加载所有可用数据,而不等待任何进一步的数据到达。

以下示例运行 10000 毫秒(10 秒)的 COPY 以获取数据样本。如果 COPY 语句能够在 10 秒内加载整个数据待完成项,那么它将剩余的时间用于在流式传输数据到达时进行加载。此值在 duration 参数中设置。附加 duration 值以完成 KafkaSource 定义:

COPY ...
SOURCE KafkaSource(stream='web_hits|0|-2',
                    brokers='kafka01.example.com:9092',
                    duration=interval '10000 milliseconds')

如果从 Kafka 启动具有较长持续时间的 COPY 语句,并且需要停止它,可以调用一个函数(例如 CLOSE_ALL_SESSIONS)来关闭其会话。

选择解析器

Kafka 不强制其数据流上的消息格式。消息通常采用 Avro 或 JSON 格式,但也可以采用任何格式。COPY 语句通常使用三个 Kafka 特定解析器之一:

因为 Kafka 解析器可以识别流式传输数据中的记录边界,所以其他解析器(如 Flex 解析器)不与 KafkaSource 的输出直接兼容。必须使用筛选器更改 KafkaSource 输出,其他解析器才能处理数据。有关详细信息,请参阅解析自定义格式

在以下示例中,web_hits 中的数据采用 JSON 格式编码,因此它使用 KafkaJSONParser。此值在 COPY 语句的 PARSER 子句中进行设置:

COPY ...
SOURCE ...
PARSER KafkaJSONParser()

存储拒绝的数据

Vertica 将解析器无法解析的原始 Kafka 消息连同有关拒绝原因的信息保存到拒绝表中。该表由 COPY 语句创建。以下示例将拒绝内容保存到名为 web_hits_rejections 的表中。此值在 COPY 语句的 REJECTED DATA AS TABLE 子句中设置:

COPY ...
SOURCE ...
PARSER ...
REJECTED DATA AS TABLE public.web_hits_rejections;

将数据流加载到 Vertica 中

以下步骤使用前面几个部分中以增量方式构建的 COPY 语句加载 web_hits 主题中的 JSON 数据 10 秒:

  1. 执行 COPY 语句:

    => COPY web_table
       SOURCE KafkaSource(stream='web_hits|0|-2',
                          brokers='kafka01.example.com:9092',
                          duration=interval '10000 milliseconds')
       PARSER KafkaJSONParser()
       REJECTED DATA AS TABLE public.web_hits_rejections;
     Rows Loaded
    -------------
             800
    (1 row)
    
  2. 计算 Flex 表键:

    => SELECT compute_flextable_keys('web_table');
                  compute_flextable_keys
    --------------------------------------------------
     Please see public.web_table_keys for updated keys
    (1 row)
    

    有关更多详细信息,请参阅计算 Flex 表键

  3. 查询 web_table_keys 以返回这些键:

    => SELECT * FROM web_table_keys;
      key_name  | frequency | data_type_guess
    ------------+-----------+-----------------
     date       |       800 | Timestamp
     user_agent |       800 | Varchar(294)
     ip         |       800 | Varchar(30)
     url        |       800 | Varchar(88)
    (4 rows)
    
  4. 查询 web_table 以返回从 web_hits Kafka 主题加载的数据:

    => SELECT date, url, ip FROM web_table LIMIT 10;
            date         |                url                 |       ip
    ---------------------+------------------------------------+-----------------
     2021-10-15 02:33:31 | search/index.htm                   | 192.168.210.61
     2021-10-17 16:58:27 | wp-content/about.html              | 10.59.149.131
     2021-10-05 09:10:06 | wp-content/posts/category/faq.html | 172.19.122.146
     2021-10-01 08:05:39 | blog/wp-content/home.jsp           | 192.168.136.207
     2021-10-10 07:28:39 | main/main.jsp                      | 172.18.192.9
     2021-10-22 12:41:33 | tags/categories/about.html         | 10.120.75.17
     2021-10-17 09:41:09 | explore/posts/main/faq.jsp         | 10.128.39.196
     2021-10-13 06:45:36 | category/list/home.jsp             | 192.168.90.200
     2021-10-27 11:03:50 | category/posts/posts/index.php     | 10.124.166.226
     2021-10-26 01:35:12 | categories/search/category.htm     | 192.168.76.40
    (10 rows)
    

4.2 - 使用调度程序自动使用来自 Kafka 的数据

Vertica 提供了一个调度程序,可加载来自一个或多个 Kafka 主题的流消息。与手动使用 COPY 相比,自动加载流式传输数据有诸多优势:

  • 流数据会自动出现在数据库中。新数据出现在数据库中的频率由调度程序的时间范围持续时间控制。

  • 调度程序提供了一个只使用一次的过程。调度程序为您管理偏移量,以便 Kafka 发送的每条消息都会使用一次。

  • 可以将备份调度程序配置为提供高可用性。如果主调度程序由于某种原因失败,备用调度程序将自动接管数据加载。

  • 调度程序管理用于加载数据的资源。可通过分配给调度程序的资源池设置来控制调度程序对资源的使用情况。在手动加载时,必须考虑在加载时所使用的资源。

使用调度程序也有一些缺点,可能不适合您的需求。您可能会发现调度程序无法提供加载过程所需的灵活性。例如,调度程序无法在加载事务期间执行业务逻辑。如果需要执行这种处理,最好创建自己的加载过程。此过程会定期运行 COPY 语句来加载 Kafka 中的数据。然后,它会在提交事务之前执行所需的业务逻辑处理。

有关作业调度程序要求的信息,请参考 Apache Kafka 集成

作业调度程序的功能

调度程序负责安排从 Kafka 加载数据。调度程序的基本处理单元是一个时间范围,也就是一段时间。在每一时间范围内,调度程序为要运行的每个活动微批处理分配一段时间。每个微批处理负责从单个来源加载数据。一旦时间范围结束,调度程序将开始下一时间范围。调度程序继续这个过程,直到您停止它为止。

调度程序剖析

每个调度程序都有多组设置,每组设置控制数据加载的一个方面。这些组包括:

  • 调度程序本身,用来定义配置架构、时间范围持续时间和资源池。

  • 群集,用来定义 Kafka 群集中与调度程序联系来加载数据的数据。每个调度程序可以包含多个群集,从而允许使用单个调度程序来从多个 Kafka 群集加载数据。

  • 源,用来定义 Kafka 主题及这些主题中从中读取数据的分区。

  • 目标,用来定义 Vertica 中将接收数据的表。这些表可以是传统的 Vertica 数据库表,也可以是 Flex 表。

  • 加载规范,用来定义 Vertica 在加载数据时使用的设置。这些设置包括 Vertica 加载数据时需要使用的解析器和筛选器。例如,如果要读取采用 Avro 格式的 Kafka 主题,则加载规范需要指定 Avro 解析器和架构。

  • 微批处理,用来 表示来自 Kafka 流的数据负载的各个分段。它们结合了您使用其他 vkconfig 工具创建的群集、源、目标和负载规范的定义。该调度程序使用微批处理中的所有信息执行 COPY 语句,并使用 KafkaSource UDL 函数将数据从 Kafka 传输到 Vertica。每个微批处理负载的统计信息均存储在 stream_microbatch_history 表中。

vkconfig 脚本

可以使用名为 vkconfig 的 Linux 命令行脚本来创建、配置和运行调度程序。此脚本安装在 Vertica 主机及 Vertica 服务器上的以下路径:

/opt/vertica/packages/kafka/bin/vkconfig

vkconfig 脚本包含多个工具。Vkconfig 脚本的第一个实参始终为您希望使用的工具。每个工具执行一个功能,例如更改一组设置(如群集或源)或启动和停止调度程序。例如,要创建或配置调度程序,可以使用以下命令:

$ /opt/vertica/packages/kafka/bin/vkconfig scheduler other options...

创建调度程序时会发生什么

在创建新调度程序时,vkconfig 脚本执行以下步骤:

  • 使用为调度程序指定的名称创建一个新的 Vertica 架构。可以在配置期间使用此名称标识调度程序。

  • 在新创建的架构中创建为了管理 Kafka 数据加载而所需的表。有关详细信息,请参阅数据流式传输架构表

验证调度程序

在创建或配置调度程序时,调度程序会验证以下设置:

  • 确认指定群集中的所有代理都存在。

  • 连接到指定的主机,并检索 Kafka 群集中所有代理的列表。获取此列表始终确保调度程序拥有所有代理的最新列表。如果主机是已经定义的群集的一部分,则调度程序将取消配置。

  • 确认指定的源是否存在。如果该源不再存在,则禁用该源。

  • 检索源中的分区数目。如果从源检索到的分区数与调度程序保存的分区值不同,Vertica 将使用从群集中的源检索到的分区数更新该调度程序。

可以使用 vkconfig 脚本的调度程序工具中的 --validation-type 选项禁用验证。有关详细信息,请参阅调度程序工具选项

同步调度程序

默认情况下,调度程序自动与 Kafka 主机群集同步其配置和源信息。可以使用 --config-refresh 调度程序实用程序选项配置同步间隔。在每个间隔,调度程序都会:

  • 通过在其 Vertica 配置架构中查询其设置,检查调度程序的配置有无更新。

  • 执行验证调度程序中列出的所有检查。

可以使用 vkconfig 脚本的调度程序工具中的 --auto-sync 选项配置同步设置。 调度程序工具选项 了解详细信息。

启动调度程序

可以使用 vkconfig 脚本的启动工具启动调度程序。

启动调度程序时,它会从源收集数据,从指定的偏移量开始。您可以查看 stream_microbatch_history 表,了解调度程序在任何给定时间执行的操作。

要了解如何创建、配置和启动调度程序,请参阅本指南中的设置调度程序

您也可以选择屏蔽调度程序。例如,您可能希望使用指定的偏移量范围执行一次加载。有关详细信息,请参阅本指南中的手动使用来自 Kafka 的数据

如果 Vertica 群集下线,则调度程序将尝试重新连接,但失败。当群集重新启动时,必须重新启动调度程序。

管理正在运行的调度程序

从命令行启动调度程序时,它在前台运行。它会一直运行,直到您终止(或主机关闭)。通常,您需要将调度程序作为守护程序进程来启动,该进程会在主机操作系统启动时或在 Vertica 数据库启动后启动调度程序。

使用 vkconfig 脚本的关闭工具关闭正在运行的调度程序。有关详细信息,请参阅关闭工具选项

可以在调度程序运行时更改它的大部分设置(例如,添加或更改群集、源、目标和微批处理)。调度程序会自动处理配置更新。

启动多个作业调度程序以实现高可用性

要实现高可用性,您可以面向同一个配置架构启动两个或多个相同的调度程序。可以使用启动工具的 --instance-name 选项来区分调度程序(请参阅启动工具选项)。

活动调度程序加载数据并维护 stream_lock 表上的 S 锁。未使用的调度程序将一直处于备用模式,直到活动调度程序失败或被禁用。如果活动调度程序失败,备用调度程序将立即获取 stream_lock 表上的锁,并从失败的调度程序离开的位置接管从 Kafka 的数据加载。

管理自动加载期间被拒绝的消息

Vertica 在自动加载期间使用解析器定义拒绝消息,这在微批处理加载规范中是必需操作。

调度程序将创建一个拒绝表来自动存储每个微批处理的被拒绝消息。要手动指定拒绝表,请在创建微批处理时使用 --rejection-schema--rejection-table微批处理实用程序选项。查询 stream_microbatches 表可返回微批处理的拒绝架构和拒绝表。

有关 Vertica 如何处理被拒绝数据的其他详细信息,请参阅处理杂乱的数据

将选项传递给调度程序的 JVM

调度程序使用 Java 虚拟机,通过 JDBC 连接到 Vertica。可以通过名为 VKCONFIG_JVM_OPTS 的 Linux 环境变量将命令行选项传递给 JVM。将调度程序配置为在连接 Vertica 时使用 TLS/SSL 加密时,此选项非常有用。有关详细信息,请参阅为调度程序配置 TLS 连接

从 MC 中查看调度程序

可以从 MC 中查看 Kafka 作业的状态。有关详细信息,请参考 查看加载历史记录

4.2.1 - 设置调度程序

您可以使用 Linux 命令行设置调度程序。通常,可以在要运行调度程序的主机上执行配置。该主机可以是您的 Vertica 主机之一,也可以是已安装 vkconfig 实用程序的其他主机(有关详细信息,请参阅 vkconfig 脚本)。

请按照以下步骤设置并启动调度程序以将数据从 Kafka 流式传输到 Vertica:

  1. 创建配置文件(可选)

  2. 将 Kafka Bin 目录添加到路径中(可选)

  3. 为调度程序创建资源池

  4. 创建调度程序

  5. 创建群集

  6. 创建数据表

  7. 创建源

  8. 创建目标

  9. 创建加载规范

  10. 创建微批处理

  11. 启动调度程序

上述步骤将在以下部分中进行说明。以下各部分将使用将 Web 日志数据(网站点击量)从 Kafka 加载到 Vertica 表的示例。

创建配置文件(可选)

您在创建调度程序时提供给 vkconfig 脚本的许多实参都不会改变。例如,您经常需要将用户名和密码传递给 Vertica 以授权对数据库进行更改。在每次调用 vkconfig 时添加用户名和密码既繁琐且易出错。

您可以改为使用为您指定这些实参的 --conf 选项向 vkconfig 实用程序传递配置文件。这样可避免大量的打字工作,降低挫败感。

配置文件是一个文本文件,其中每行均包含“关键字=”对。每个关键字都是一个 vkconfig 命令行选项,例如常用 vkconfig 脚本选项中列出的选项。

以下示例显示了名为 weblog.conf 的配置文件,该文件将用于定义名为 weblog_sched 的调度程序。此示例的剩余部分都会使用此配置文件。


# The configuraton options for the weblog_sched scheduler.
username=dbadmin
password=mypassword
dbhost=vertica01.example.com
dbport=5433
config-schema=weblog_sched

将 vkconfig 目录添加到路径中(可选)

vkconfig 脚本位于 /opt/vertica/packages/kafka/bin 目录中。在每次调用 vkconfig 时键入此路径比较繁琐。您可以使用以下命令将 vkconfig 添加到当前 Linux 会话的搜索路径中:

$ export PATH=/opt/vertica/packages/kafka/bin:$PATH

对于会话的剩余部分,您可以调用 vkconfig 而无需指定其整个路径:

$ vkconfig
Invalid tool
Valid options are scheduler, cluster, source, target, load-spec, microbatch, sync, launch,
shutdown, help

如果要使此设置永久有效,请将导出语句添加到 ~/.profile 文件中。此示例的剩余部分假设您已将此目录添加到 shell 的搜索路径中。

为调度程序创建资源池

Vertica 建议您始终专门为每个调度程序创建一个 资源池。调度程序假定自身独占使用分配的资源池。为调度程序使用单独的池,有助于优化它对 Vertica 群集性能的影响。您可以使用 CREATE RESOURCE POOL 语句在 Vertica 中创建资源池。

以下资源池设置在创建调度程序的资源池时发挥着重要作用:

  • PLANNEDCONCURRENCY 决定调度程序同时发送给 Vertica 的微批处理(COPY 语句)的数量。

  • EXECUTIONPARALLELISM 决定每个节点为处理微批处理分区而创建的最大线程数。

  • QUEUETIMEOUT 提供对资源计时的手动控制。将此参数设置为 0 即可允许调度程序管理计时。

有关这些设置以及如何为调度程序微调资源池的详细信息,请参阅管理调度程序资源和性能

以下 CREATE RESOURCE POOL 语句将创建可加载 1 个微批处理并处理 1 个分区的资源池:

=> CREATE RESOURCE POOL weblog_pool
    MEMORYSIZE '10%'
    PLANNEDCONCURRENCY 1
    EXECUTIONPARALLELISM 1
    QUEUETIMEOUT 0;

如果没有为调度程序创建和分配资源池,它将使用 GENERAL 资源池的一部分。Vertica 建议您不要将 GENERAL 池用于生产环境中使用的调度程序。这种回退到使用 GENERAL 池的举措,是为了在测试调度程序配置期间提供方便。当您准备好部署调度程序时,请创建一个您已根据特定需求进行调整的资源池。每次启动使用 GENERAL 池的调度程序时,vkconfig 实用程序都会显示一条警告消息。

如果没有为调度程序分配足够的资源,则有可能会导致错误。例如,如果调度程序无法从预期的数据帧中加载所有主题的数据,则可能会显示错误:OVERSHOT DEADLINE FOR FRAME。

有关资源池的详细信息,请参阅资源池架构

创建调度程序

Vertica 包含名为 stream_config 的默认调度程序。您可以使用此调度程序,或者使用 vkconfig 脚本的调度程序工具以及 --create--config-schema 选项来创建新的调度程序:

$ vkconfig scheduler --create --config-schema scheduler_name --conf conf_file

添加使用默认选项的调度程序,只需使用 --create--config-schema 选项即可。此命令会在 Vertica 中创建一个包含调度程序配置的新架构。有关创建调度程序架构的详细信息,请参阅创建调度程序时发生的情况

您也可使用其他配置参数进一步自定义您的调度程序。有关详细信息,请参阅调度程序工具选项

下面的例子:

  • 使用 --config-schema 选项创建名为 weblog_sched 的调度程序。

  • 使用 --operator 选项向名为 kafka_user 的 Vertica 用户授予配置和运行调度程序的权限。dbadmin 用户必须单独指定其他权限。

  • 使用 --frame-duration 选项指定七分钟的时间范围持续时间。有关选择时间范围持续时间的详细信息,请参阅选择时间范围持续时间

  • 将调度程序使用的资源池设置为之前创建的 weblog_pool:

$ vkconfig scheduler --create --config-schema weblog_sched --operator kafka_user \
  --frame-duration '00:07:00' --resource-pool weblog_pool --conf weblog.conf

创建群集

您必须将至少一个 Kafka 群集与调度程序相关联。调度程序可以访问多个 Kafka 群集。要创建群集,您可以提供群集名称以及 Kafka 群集代理的主机名和端口。

创建群集时,调度程序会尝试通过连接到 Kafka 群集来对其进行验证。如果连接成功,调度程序会自动检索群集中所有代理的列表。因此,您不必在 --hosts 参数中列出每个代理。

以下示例会创建一个名为 kafka_weblog 的群集,其中包含两个 Kafka 代理主机:example.com 域中的 kafka01 和 kafka03。Kafka 代理正在端口 9092 上运行。

$ vkconfig cluster --create --cluster kafka_weblog \
  --hosts kafka01.example.com:9092,kafka03.example.com:9092 --conf weblog.conf

有关详细信息,请参阅群集工具选项

创建源

接下来,至少创建一个供调度程序读取的源。源定义了调度程序将从中加载数据的 Kafka 主题以及该主题包含的分区数。

要创建源并将其与已配置的调度程序相关联,请使用 source 工具。创建源时,Vertica 会连接到 Kafka 群集以验证主题是否存在。因此,在创建源之前,请确保该主题已存在于 Kafka 群集中。由于 Vertica 会验证主题是否存在,因此您必须使用 --cluster 选项提供先前定义的群集名称。

以下示例会在上一步中创建的群集上为名为 web_hits 的 Kafka 主题创建源。此主题具有一个分区。

$ vkconfig source --create --cluster kafka_weblog --source web_hits --partitions 1 --conf weblog.conf

有关详细信息,请参阅源工具选项

创建数据表

在可以为调度程序创建目标之前,您必须在 Vertica 数据库中创建目标表。这是 Vertica 用于存储调度程序从 Kafka 加载的数据的表。您必须决定为目标创建的表类型:

  • 使用 CREATE TABLE 语句创建的标准 Vertica 数据库表。此类型的表可高效地存储数据。但是,您必须确保其列与您正在加载的 Kafka 主题中消息的数据格式相匹配。不能将复杂类型的数据加载到标准 Vertica 表中。

  • 使用 CREATE FLEXIBLE TABLE 创建的 Flex 表。Flex 表的效率低于标准 Vertica 数据库表。但是,它足够灵活,可以处理架构不断变化的数据。此外,还可以加载标准 Vertica 表无法加载的大多数复杂数据类型(例如地图和列表)。

此示例中的数据采用的是设定好的格式,因此最好使用标准 Vertica 表。以下示例会创建名为 web_hits 的表来保存四列数据。此表位于公共架构中。

=> CREATE TABLE web_hits (ip VARCHAR(16), url VARCHAR(256), date DATETIME, user_agent VARCHAR(1024));

创建目标

创建目标表后,您可以创建调度程序的目标。目标会告知调度程序在何处存储从 Kafka 检索到的数据。创建目标时,此表必须存在。您可以使用带有 --target-schema--target_table 选项的 vkconfig 脚本的目标工具来指定 Vertica 目标表的架构和名称。以下示例会为在上一步中创建的表添加目标。

$ vkconfig target --create --target-schema public --target-table web_hits --conf weblog.conf

有关详细信息,请参阅目标工具选项

创建加载规范

调度程序的加载规范提供 Vertica 在解析从 Kafka 加载的数据时使用的参数。最重要的选项是 --parser,可设置 Vertica 用于解析数据的解析器。您可以使用三个解析器选项:

在此示例中,正在从 Kafka 加载的数据采用 JSON 格式。以下命令会创建名为 weblog_load 的加载规范并将解析器设置为 KafkaJSONParser。

$ vkconfig load-spec --create --parser KafkaJSONParser --load-spec weblog_load --conf weblog.conf

有关详细信息,请参阅加载规范工具选项

创建微批处理

微批处理将合并到目前为止添加到调度程序的所有设置,以定义调度程序用于从 Kafka 加载数据的各个 COPY 语句。

以下示例使用前面示例中创建的所有设置来创建名为 weblog 的微批处理。

$ vkconfig microbatch --create --microbatch weblog --target-schema public --target-table web_hits \
           --add-source web_hits --add-source-cluster kafka_weblog --load-spec weblog_load \
           --conf weblog.conf

对于可能受益于事务大小减小的微批处理,请考虑在创建微批处理时使用 --max-parallelism 选项。此选项会将具有多个分区的单个微批处理拆分为由较少分区组成的指定数量的同步 COPY 语句。

有关 --max-parallelism 和其他选项的详细信息,请参阅微批处理工具选项

启动调度程序

创建至少一个微批处理后,就可以运行调度程序。您可以使用启动工具来启动调度程序,并将调度程序架构的名称传递给它。调度程序会开始为在其架构中定义的每个已启用微批处理调度微批处理加载。

以下示例会启动在前面步骤中定义的 weblog 调度程序。该调度程序会使用 nohup 命令来防止调度程序在用户注销时被终止,并重定向 stdout 和 stderr 来防止创建 nohup.out 文件。

$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &

有关详细信息,请参阅启动工具选项

检查调度程序是否正在运行

启动调度程序后,您可以通过查询调度程序架构中的 stream_microbatch_history 表来验证它是否正在运行。此表列出了调度程序已运行的每个微批处理的结果。

例如,此查询列出了微批处理名称、微批处理的开始时间和结束时间、批处理的开始偏移量和结束偏移量,以及批处理结束的原因。结果从调度程序已启动时开始排序:

=> SELECT microbatch, batch_start, batch_end, start_offset,
          end_offset, end_reason
          FROM weblog_sched.stream_microbatch_history
          ORDER BY batch_start DESC LIMIT 10;

 microbatch |        batch_start         |         batch_end          | start_offset | end_offset |  end_reason
------------+----------------------------+----------------------------+--------------+------------+---------------
 weblog     | 2017-10-04 09:30:19.100752 | 2017-10-04 09:30:20.455739 |           -2 |      34931 | END_OF_STREAM
 weblog     | 2017-10-04 09:30:49.161756 | 2017-10-04 09:30:49.873389 |        34931 |      34955 | END_OF_STREAM
 weblog     | 2017-10-04 09:31:19.25731  | 2017-10-04 09:31:22.203173 |        34955 |      35274 | END_OF_STREAM
 weblog     | 2017-10-04 09:31:49.299119 | 2017-10-04 09:31:50.669889 |        35274 |      35555 | END_OF_STREAM
 weblog     | 2017-10-04 09:32:19.43153  | 2017-10-04 09:32:20.7519   |        35555 |      35852 | END_OF_STREAM
 weblog     | 2017-10-04 09:32:49.397684 | 2017-10-04 09:32:50.091675 |        35852 |      36142 | END_OF_STREAM
 weblog     | 2017-10-04 09:33:19.449274 | 2017-10-04 09:33:20.724478 |        36142 |      36444 | END_OF_STREAM
 weblog     | 2017-10-04 09:33:49.481563 | 2017-10-04 09:33:50.068068 |        36444 |      36734 | END_OF_STREAM
 weblog     | 2017-10-04 09:34:19.661624 | 2017-10-04 09:34:20.639078 |        36734 |      37036 | END_OF_STREAM
 weblog     | 2017-10-04 09:34:49.612355 | 2017-10-04 09:34:50.121824 |        37036 |      37327 | END_OF_STREAM
(10 rows)

4.2.2 - 选择时间范围持续时间

调度程序的一个关键设置是它的时间范围持续时间。持续时间设置调度程序运行您为其定义的所有微批处理的时间量。此设置对如何从 Apache Kafka 加载数据有显著的影响。

在每一时间范围期间会发生什么

要了解正确的时间范围持续时间,首先需要了解在每一时间范围期间发生了什么。

时间范围持续时间在添加到调度程序的微批处理之间分配。此外,每一时间范围都有一些开销,需要花时间处理微批处理。在每个微批处理中,也有一些开销,这减少了微批处理在从 Kafka 加载数据方面花费时间。下图大致显示了每一时间范围是如何划分的:

正如您所见,时间范围中只有一部分时间用于实际加载流式传输数据。

调度程序如何对微批处理排列优先顺序

首先,调度程序将时间范围中的时间均匀地分给各个微批处理。然后,它依次运行每个微批处理。

在每个微批处理中,专门花大量时间来通过 COPY 语句加载数据。此语句使用 KafkaSource UDL 加载数据。它会一直运行到发生以下两种情况之一:

  • 它到达为微批处理定义的主题和分区的数据流的末端。在这种情况下,微批处理提前完成处理。

  • 它到达调度程序设置的超时时间。

当调度程序处理时间范围时,它会记下哪些微批处理提前完成。然后,它会安排它们在下一时间范围首先运行。如果以这种方式安排微批处理,则可以让调度程序将时间范围中的更多时间分配给在加载数据方面花费最多时间的微批处理(或许还没有足够的时间到达其数据流的末端)。

例如,请考虑下图。在第一时间范围期间,调度程序在微批处理之间平均分配时间。微批处理 #2 使用分配给它的所有时间(如填充区域所示),而其他微批处理并未这样。在下一时间范围中,调度程序重新排列这些微批处理,使提前完成的微批处理首先进行。它还将更少的时间分配给运行时间较短的微批处理。假定这些微批处理再次提前完成,调度程序便能够将时间范围中的剩余时间分配给微批处理 #2。当调度程序运行时,这种优先级的转移将会继续进行。如果一个主题的流量激增,调度程序会为读取该主题的微批处理分配更多的时间作为补偿。

时间范围持续时间太短会发生什么

如果将调度程序的时间范围持续时间设置得太短,微批处理可能没有足够的时间来加载它们负责读取的数据流中的所有数据。在最坏的情况下,当在每一时间范围期间读取大容量主题时,微批处理可能会落后更多。如果不加以解决,这个问题可能会导致消息永远无法加载,因为在微批处理有机会读取它们之前,它们就已经在数据流中过期了。

在极端情况下,调度程序可能无法在每一时间范围期间中运行每一个微批处理。如果时间范围持续时间太短,以至于大部分时间都花在开销任务(如提交数据和准备运行微批处理)上,便会发生这个问题。每个微批处理为了从 Kafka 加载数据而运行的 COPY 语句的最短持续时间为 1 秒。再加上处理数据加载的开销。一般来说,如果时间范围持续时间短于 2 秒乘以调度程序中的微批处理数,那么一些微批处理可能会没有机会在每一时间范围中运行。

如果调度程序在一个时间范围期间运行每个微批处理超时,那么它在下一时间范围期间会通过优先安排前一时间范围中没有运行的微批处理来做出补偿。这个策略确保每个微批处理都有机会加载数据。但是,这并不能解决问题的根本原因。最好的解决方案是增加时间范围持续时间,为每个微批处理分配足够的时间来在每一时间范围期间加载数据。

时间范围持续时间太长会发生什么

长时间范围持续时间的一个缺点是增加了数据延迟。这个延迟是从 Kafka 发出数据到可在数据库中查询数据之间的时间。较长的时间范围持续时间意味着微批处理的每次执行之间有更多的时间。这意味着更新数据库中的数据之间的间隔时间更长。

这种延迟可能并不重要,具体取决于应用。在确定时间范围持续时间时,请考虑数据可能会延迟到整个时间范围持续时间是否将导致问题。

使用长时间范围持续时间时要考虑的另一个问题是,关闭调度程序所需的时间。只有在当前 COPY 语句完成之后,调度程序才会关闭。根据时间范围持续时间的长度,这个过程可能需要几分钟。

最短时间范围持续时间

为添加到调度程序的每个微批处理至少分配两秒钟。如果时间范围持续时间短于该下限,vkconfig 实用程序将会发出警告。在大多数情况下,您都希望时间范围持续时间更长。如果每个微批处理两秒钟,几乎没有时间来加载数据。

平衡时间范围持续时间要求

要确定部署的最佳时间范围持续时间,请考虑您对数据延迟的敏感程度。如果不是在对来自 Kafka 的数据流式传输执行时间敏感型查询,可以使用默认的 5 分钟甚至更长的时间范围持续时间。如果需要数据延迟更短,那么请考虑从 Kafka 读取的数据量。如果时间范围持续时间较短,那么大容量数据或者流量有明显峰值的数据可能会导致问题。

针对不同需求使用不同的调度程序

假定您要加载的流式传输数据来自少数您希望以低延迟查询的 Kafka 主题,以及其他具有大量数据但您可以承受更长时间延迟的主题。在这种情况下,选择“中间”时间范围持续时间可能无法满足任一需求。更好的解决方案是使用多个调度程序:创建两个调度程序,其中一个调度程序的时间范围持续时间较短,仅读取需要以低延迟查询的主题;另一个调度程序的时间范围持续时间较长,用于从大容量主题加载数据。

例如,假定您要通过 Kafka 将流式传输数据从物联网 (IOT) 传感器网络加载到 Vertica。您可以使用其中的大部分数据定期生成报告并更新仪表板显示。这两个用例对时间都不是特别敏感。但是,正在从中进行加载的三个主题确实包含时间敏感型数据(系统故障、入侵检测和连接中断),这些数据必须立即触发警报。

在这种情况下,可以创建两个调度程序,其中一个调度程序的时间范围持续时间为 5 分钟或更长,用于读取包含非关键数据的大多数主题;第二个调度程序的时间范围持续时间至少为 6 秒(但最好更长),用于仅加载来自三个时间敏感型主题的数据。希望这些主题中的数据量足够小,以至于短时间范围持续时间不会导致问题。

4.2.3 - 管理调度程序资源和性能

调度程序的性能受调度程序中的微批处理数、每个微批处理中的分区和 Vertica 群集中的节点影响。使用资源池为调度程序分配一部分系统资源,并微调这些资源,以优化 Vertica 的自动加载。

以下各节详细介绍了调度程序资源池配置和处理场景:

调度程序和资源池

Vertica 建议您始终专门为每个调度程序创建一个 资源池。调度程序假定自身独占使用分配的资源池。为调度程序使用单独的池,有助于优化它对 Vertica 群集性能的影响。您可以使用 CREATE RESOURCE POOL 语句在 Vertica 中创建资源池。

如果没有为调度程序创建和分配资源池,它将使用 GENERAL 资源池的一部分。Vertica 建议您不要将 GENERAL 池用于生产环境中使用的调度程序。这种回退到使用 GENERAL 池的举措,是为了在测试调度程序配置期间提供方便。当您准备好部署调度程序时,请创建一个您已根据特定需求进行调整的资源池。每次启动使用 GENERAL 池的调度程序时,vkconfig 实用程序都会显示一条警告消息。

如果没有为调度程序分配足够的资源,则有可能会导致错误。例如,如果调度程序无法从预期的数据帧中加载所有主题的数据,则可能会显示错误:OVERSHOT DEADLINE FOR FRAME。

有关资源池的详细信息,请参阅资源池架构

关键资源池设置

微批处理是一个工作单元,它在一个时间范围持续时间内处理单个 Kafka 主题的分区。以下资源池设置在 Vertica 如何加载微批处理和处理分区中发挥着重要作用:

  • PLANNEDCONCURRENCY 决定调度程序同时发送给 Vertica 的微批处理(COPY 语句)的数量。在每一时间范围开始时,调度程序都会创建 PLANNEDCONCURRENCY 指定的调度程序线程数。每个调度程序线程连接到 Vertica,每次加载一个微批处理。如果微批处理数大于调度程序线程数,调度程序将额外的微批处理放入队列,并在线程变得可用时加载它们。
  • EXECUTIONPARALLELISM 决定每个节点用于处理微批处理分区的最大线程数。当一个微批处理加载到 Vertica 时,它的分区会均匀地分布在群集中的节点之间。在每一时间范围期间,节点最多为每个分区创建一个线程。每个线程一次从一个分区读取数据,直到处理完成或该时间范围结束。如果分区数大于所有节点上的线程数,则在线程可用时处理剩余的分区。
  • QUEUETIMEOUT 提供对资源计时的手动控制。将资源池参数 QUEUETIMEOUT 设置为 0 即可允许调度程序管理计时。在所有的微批处理都得到处理后,调度程序将等待该时间范围的剩余时间来处理下一个微批处理。具有适当大小的配置包括为了应对流量高峰而规划的休息时间。有关时间范围持续时间大小所带来影响的信息,请参阅选择时间范围持续时间

例如,以下 CREATE RESOURCE POOL 语句将创建一个名为 weblogs_pool 的资源池,它同时加载 2 个微批处理。Vertica 群集中的每个节点为每个微批处理创建 10 个线程来处理分区:

=> CREATE RESOURCE POOL weblogs_pool
    MEMORYSIZE '10%'
    PLANNEDCONCURRENCY 2
    EXECUTIONPARALLELISM 10
    QUEUETIMEOUT 0;

对于三节点 Vertica 群集,weblogs_pool 为每个节点提供的资源将最多创建 10 个线程来处理分区,或者每个微批处理最多总共 30 个线程。

并发加载多个微批处理

在某些情况下,调度程序中的微批处理可能比可用的 PLANNEDCONCURRENCY 多。下面的几张图说明了当没有足够的调度程序线程同时加载每个微批处理时,调度程序如何将微批处理加载到单个 Vertica 节点中。虽然资源池的 PLANNEDCONCURRENCY (PC) 设置为 2,但是调度程序必须加载三个微批处理:A、B 和 C。为简单起见,将 EXECUTIONPARALLELISM (EP) 设置为 1。

首先,调度程序加载微批处理 A 和微批处理 B,而微批处理 C 等待:

加载第一组微批处理。

当任何一个微批处理完成加载后,调度程序加载任何剩余的微批处理。在下图中,微批处理 A 已完全加载到 Vertica 中。调度程序继续加载微批处理 B,并使用新的可用调度程序线程加载微批处理 C:

加载剩余的微批处理。

调度程序继续发送数据,直到将所有微批处理加载到 Vertica 中,或该时间范围结束。

试用 PLANNEDCONCURRENCY 来优化性能。请注意,设置得过高可能会在每一时间范围的开始创建太多的连接,从而导致 Vertica 或 Kafka 的可扩展性压力。将 PLANNEDCONCURRENCY 设置得过低则不能充分利用 Vertica 的多处理能力。

Vertica 中的并行处理

资源池设置 EXECUTIONPARALLELISM 限制每个 Vertica 节点为处理分区创建的线程数。下图演示了在 EXECUTIONPARALLELISM 不足以为每个分区创建一个线程时,三节点 Vertica 群集如何处理具有九个分区的主题。这些分区均匀地分布在 Vertica 群集中的节点 1、节点 2 和节点 3 之间。调度程序的资源池将 PLANNEDCONCURRENCY (PC) 设置为 1,将 EXECUTIONPARALLELISM (EP) 设置为 2,因此当调度程序加载微批处理 A 时,每个节点最多创建 2 个线程。每个线程一次从一个分区进行读取。没有为其分配线程的分区必须等待处理:

使用可用线程处理分区。

当线程处理完为其分配的分区后,剩下的分区将在线程可用时分配给线程:

当线程可用时处理剩余分区。

在调度程序的资源池上设置 EXECUTIONPARALLELISM 时,请考虑调度程序中所有微批处理的分区数。

并发加载分区主题

具有多个分区的单个主题可能会从增加并行加载或降低事务大小中受益。使用 --max-parallelism微批处理,可以动态地将具有多个分区的主题拆分为多个负载均衡的微批处理,每个微批处理均由原始微批处理的分区的子集组成。调度程序使用其资源池中可用的 PLANNEDCONCURRENCY 来同时加载动态拆分的微批处理。

调度程序资源池中的 EXECUTIONPARALLELISM 设置决定每个节点为处理其单个微批处理的部分分区而创建的最大线程数。拆分微批处理使每个节点都能够为同一工作单元创建更多的线程。当有足够的 PLANNEDCONCURRENCY 并且每个节点分配的分区数大于调度程序资源池中的 EXECUTIONPARALLELISM 设置时,使用 --max-parallelism 拆分微批处理并在每个节点上创建更多线程来并行处理更多分区。

下图演示了双节点 Vertica 群集如何使用 PLANNEDCONCURRENCY (PC) 和 EXECUTIONPARALLELISM (EP) 均设置为 2 的资源池来加载和处理微批处理 A。因为调度程序只加载一个微批处理,所以有 1 个调度程序线程未使用。每个节点为每个调度程序线程创建 2 个线程来处理分配给它的分区:

在不使用 max-parallelism 选项时加载。

将微批处理 A 的 --max-parallelism 选项设为 2 将使调度程序可以动态地将微批处理 A 拆分为 2 个更小的微批处理 A1 和 A2。因为有 2 个调度程序线程可用,所以子集微批处理会同时加载到 Vertica 中。每个节点为每个调度程序线程创建 2 个线程来处理微批处理 A1 和 A2 的分区:

使用 max-parallelism 选项加载。

使用 --max-parallelism 防止由大容量 Kafka 主题组成的微批处理中出现瓶颈。它还为需要额外处理(如文本索引)的微批处理提供更快的加载。

4.2.4 - 对 Kafka 调度程序使用连接负载均衡

--dbhost 选项或配置文件中的 dbhost 条目中向调度程序提供 Vertica 节点的名称。调度程序连接到此节点以初始化它用于从 Kafka 加载数据而执行的所有语句。例如,每次执行微批处理时,调度程序都会连接到同一节点来运行 COPY 语句。如果使用单个节点作为所有调度程序操作的启动程序节点,这会影响该节点的性能,进而影响整个数据库的性能。

为了避免单个节点成为瓶颈,可以使用连接负载均衡将运行调度程序语句的负载分散到数据库中的多个节点上。连接负载均衡在负载均衡组内的节点之间分配客户端连接。有关此功能的概述,请参阅关于本机连接负载均衡

为调度程序启用连接负载均衡是一个两步骤过程:

  1. 为调度程序选择或创建负载均衡策略。

  2. 在调度程序中启用负载均衡。

为调度程序选择或创建负载均衡策略

连接负载均衡策略将来自特定网络地址集的传入连接重定向到一组节点。如果您的数据库已定义合适的负载均衡策略,那么可以直接使用它,而不是专门为调度程序创建一个。

如果数据库没有合适的策略,请创建一个。让您的策略将来自运行 Kafka 调度程序的主机的 IP 地址的连接重定向到数据库中的一组节点。您选择的节点组将充当调度程序所执行语句的启动程序。

以下示例演示了如何为三节点数据库中的所有三个节点设置负载均衡策略。调度程序在数据库中的节点 1 上运行,因此路由规则的源地址范围 (192.168.110.0/24) 包含数据库中节点的 IP 地址。示例的最后一步验证是否已对来自第一个节点(IP 地址 10.20.110.21)的连接进行负载均衡。

=> SELECT node_name,node_address,node_address_family FROM v_catalog.nodes;
    node_name     | node_address | node_address_family
------------------+--------------+----------------------
 v_vmart_node0001 | 10.20.110.21 | ipv4
 v_vmart_node0002 | 10.20.110.22 | ipv4
 v_vmart_node0003 | 10.20.110.23 | ipv4
(4 rows)


=> CREATE NETWORK ADDRESS node01 ON v_vmart_node0001 WITH '10.20.110.21';
CREATE NETWORK ADDRESS
=> CREATE NETWORK ADDRESS node02 ON v_vmart_node0002 WITH '10.20.110.22';
CREATE NETWORK ADDRESS
=> CREATE NETWORK ADDRESS node03 on v_vmart_node0003 WITH '10.20.110.23';
CREATE NETWORK ADDRESS

=> CREATE LOAD BALANCE GROUP kafka_scheduler_group WITH ADDRESS node01,node02,node03;
CREATE LOAD BALANCE GROUP
=> CREATE ROUTING RULE kafka_scheduler_rule ROUTE
   '10.20.110.0/24' TO kafka_scheduler_group;
CREATE ROUTING RULE
=> SELECT describe_load_balance_decision('10.20.110.21');
                     describe_load_balance_decision
--------------------------------------------------------------------------------
  Describing load balance decision for address [10.20.110.21]
Load balance cache internal version id (node-local): [2]
Considered rule [kafka_scheduler_rule] source ip filter [10.20.110.0/24]...
input address matches this rule
Matched to load balance group [kafka_scheduler_group] the group has policy [ROUNDROBIN]
number of addresses [3]
(0) LB Address: [10.20.110.21]:5433
(1) LB Address: [10.20.110.22]:5433
(2) LB Address: [10.20.110.23]:5433
Chose address at position [1]
Routing table decision: Success. Load balance redirect to: [10.20.110.23] port [5433]

(1 row)

在调度程序中启用负载均衡

客户端必须选择加入负载均衡,Vertica 才能将连接负载均衡策略应用到连接。例如,必须将 -C 标记传递给 vsql 命令,才能对您的交互式会话实现负载均衡。

调度程序使用 Java JDBC 库连接到 Vertica。要让调度程序选择加入负载均衡,必须将 JDBC 库的 ConnectionLoadBalance 选项设置为 1。有关详细信息,请参阅JDBC 中的负载均衡

使用 vkconfig 脚本的 --jdbc-opt 选项,或者将 jdbc-opt 选项添加到配置文件中以设置 ConnectionLoadBalance 选项。例如,要使用名为 weblog.conf 的配置文件从命令行启动调度程序,请使用以下命令:

$ nohup vkconfig launch --conf weblog.conf --jdbc-opt ConnectionLoadBalance=1 >/dev/null 2>&1 &

要永久启用负载均衡,可以将负载均衡选项添加到配置文件中。以下示例显示了设置调度程序内配置为使用连接负载均衡的示例中的 weblog.conf 文件。

username=dbadmin
password=mypassword
dbhost=10.20.110.21
dbport=5433
config-schema=weblog_sched
jdbc-opt=ConnectionLoadBalance=1

可通过查询 SESSIONS 表来检查是否正在对调度程序的连接进行负载均衡:

=> SELECT node_name, user_name, client_label FROM V_MONITOR.SESSIONS;
    node_name     | user_name |               client_label
------------------+-----------+-------------------------------------------
 v_vmart_node0001 | dbadmin   | vkstream_weblog_sched_leader_persistent_4
 v_vmart_node0001 | dbadmin   |
 v_vmart_node0002 | dbadmin   | vkstream_weblog_sched_lane-worker-0_0_8
 v_vmart_node0003 | dbadmin   | vkstream_weblog_sched_VDBLogger_0_9
(4 rows)

在 client_labels 列中,调度程序的连接具有以 vkstream 开头的标签(没有客户端标签的行是交互式会话)。可以看到调度程序打开的三个连接都进入了不同的节点。

4.2.5 - 使用偏移量限制加载

Kafka 会维护用户可配置的消息积压。默认情况下,新创建的调度程序读取 Kafka 主题中的所有消息,包括积压的所有消息,而不仅仅是调度程序启动后流出的消息。通常,您想要的就是这样。

然而,在某些情况下,您可能只想将源的一部分流传输到表中。例如,假定您想要分析您的电子商务站点在特定日期和时间的 Web 流量。然而,您的 Kafka 主题包含比您想要分析的时间更早的 Web 访问记录。在这种情况下,可以使用偏移量,将需要的数据流传输到 Vertica 进行分析。

另一个常见的用例是当您已经从 Kafka 手动加载了一些数据时(请参阅手动使用来自 Kafka 的数据)。现在,您希望流传输新到达的所有数据。默认情况下,调度程序会重新加载之前加载的所有数据(假定仍然可以从 Kafka 获得)。此时,可以使用偏移量来告诉调度程序从手动数据加载停止的位置开始自动加载数据。

配置调度程序从某个偏移量开始流传输

vkconfig 脚本的微批处理工具有一个 --offset 选项,使用它可以指定希望调度程序开始加载的源中消息的索引。此选项接受逗号分隔的索引值列表。除非使用 --partition 选项,否则必须为源中的每个分区提供一个索引值。此选项可用于选择应用偏移量的分区。在微批处理中设置偏移量时,调度程序不能运行。

如果微批处理定义了多个群集,请使用 --cluster 选项选择偏移量选项适用于哪个群集。类似地,如果微批处理有多个源,则必须使用 --source 选项选择一个。

例如,假定您只想从一个名为 web_hits 的源加载最后 1000 条消息。为了方便起见,假定源只包含单个分区,而微批处理只定义了单个群集和单个主题。

第一个任务是确定流结尾的当前偏移量。可以在一个 Kafka 节点上通过调用 GetOffsetShell 类执行此操作,将时间参数设置为 -1(主题的结尾):

$ path to kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
                                          --broker-list kafka01:9092,kafka03:9092 --time -1 \
                                          --topic web_hits

{metadata.broker.list=kafka01:9092,kafka03:9092, request.timeout.ms=1000,
 client.id=GetOffsetShell, security.protocol=PLAINTEXT}
web_hits:0:8932

还可以使用 GetOffsetShell 查找流中出现在时间戳之前的偏移量。

在上面的示例中,web_hits 主题的单个分区的结尾偏移量为 8932。如果我们希望加载来自源的最后 1000 条消息,我们需要将微批处理的偏移量设置为 8932 - 1001 或 7931。

计算偏移量后,即可在微批处理配置中设置了。下面的例子:

  • 关闭调度程序,其配置信息存储在 weblog.conf 文件中。

  • 使用微批处理实用程序设置起始偏移量。

  • 重新启动调度程序。

$ vkconfig shutdown --conf weblog.conf
$ vkconfig microbatch --microbatch weblog --update --conf weblog.conf --offset 7931
$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &

如果目标表为空,或者在调度程序启动之前被截断,那么表中将有 1000 行(直到通过源流传输更多的消息):

=> select count(*) from web_hits;
 count
-------
  1000
(1 row)

4.2.6 - 在 Vertica 升级后更新调度程序

调度程序仅与创建它的 Vertica 版本兼容。在 Vertica 版本之间,调度程序的配置架构或调度程序调用的 UDx 函数可能会发生更改。在升级 Vertica 后,必须更新调度程序才能收到这些更改。

当您将 Vertica 升级到新的主要版本或服务包时,使用 vkconfig 调度程序工具的 --upgrade 选项来更新调度程序。如果不更新调度程序,则在尝试启动它时将收到一条错误消息。例如:

$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &
com.vertica.solutions.kafka.exception.FatalException: Configured scheduler schema and current
scheduler configuration schema version do not match. Upgrade configuration by running:
vkconfig scheduler --upgrade
    at com.vertica.solutions.kafka.scheduler.StreamCoordinator.assertVersion(StreamCoordinator.java:64)
    at com.vertica.solutions.kafka.scheduler.StreamCoordinator.run(StreamCoordinator.java:125)
    at com.vertica.solutions.kafka.Launcher.run(Launcher.java:205)
    at com.vertica.solutions.kafka.Launcher.main(Launcher.java:258)
Scheduler instance failed. Check log file. Check log file.
$ vkconfig scheduler --upgrade --conf weblog.conf
Checking if UPGRADE necessary...
UPGRADE required, running UPGRADE...
UPGRADE completed successfully, now the scheduler configuration schema version is v8.1.1
$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &
                   .  .  .

4.3 - 监控消息使用情况

可以通过多种方式监控来自 Kafka 的数据流式传输的进度:

  • 监控 Vertica 向其报告进度的使用者组。如果要用于监控数据加载情况的工具能够与 Kafka 协调使用,那么这种技术最好。

  • 使用 vkconfig 工具中内置的监控 API。这些 API 以 JSON 格式报告流式传输调度程序的配置和使用情况。如果您正在开发自己的监控脚本,或者您的监控工具可以使用 JSON 格式的状态信息,这些 API 会非常有用。

4.3.1 - 通过使用者组监控 Vertica 消息使用情况

Apache Kafka 具有名为使用者组的功能,可帮助在多组使用者之间分配消息使用负载。采用使用者组时,Kafka 会根据组中的使用者数量来平均分配消息。使用者会向 Kafka 代理报告已成功读取的消息。此报告可帮助 Kafka 管理主题分区中的消息偏移量,以便不会向组中的任何使用者发送两次相同的消息。

在管理负载分配或防止重复加载消息方面,Vertica 不依赖于 Kafka 的使用者组。流式传输作业调度程序会自行管理主题分区偏移量。

即使 Vertica 不需要使用者组来管理偏移量,它也会向 Kafka 代理报告已使用的消息。借助此功能,您可以在 Vertica 群集加载消息时使用第三方工具监控其进度。默认情况下,Vertica 会向名为 vertica-databaseName(其中 databaseName 是 Vertica 数据库的名称)的使用者组报告其进度。在定义调度程序时或手动加载数据期间,可以更改 Vertica 向其报告进度的使用者组的名称。在加载数据时,第三方工具可以让 Kafka 代理监控 Vertica 群集的进度。

例如,可以使用 Kafka 的 kafka-consumer-groups.sh 脚本(位于 Kafka 安装的 bin 目录中)查看 Vertica 使用者组的状态。以下示例演示了如何列出 Kafka 群集中定义的可用使用者组并显示 Vertica 使用者组的详细信息:

$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

vertica-vmart
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
   --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'vertica-vmart' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
web_hits                       0          24500           30000           5500       -                                                 -                              -

从输出中,可以看到 Vertica 向 vertica-vmart 使用者组报告消息使用情况。此组是 Vertica 加载示例 VMart 数据库时的默认使用者组。第二个命令列出了 vertica-vmart 使用者组正在使用的主题。可以看到 Vertica 群集已读取主题唯一分区中的 24500 条消息(总共 30000 条)。稍后,运行相同的命令将显示 Vertica 群集的进度:

$ cd /path/to/kafka/bin
$ ./kafka-consumer-groups.sh --describe --group vertica-vmart \
    --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'vertica-vmart' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
web_hits                       0          30000           30000           0          -

更改 Vertica 向其报告进度的使用者组

可以更改 Vertica 在使用消息时向其报告进度的使用者组。

使用调度程序自动加载时进行更改

使用调度程序时,可通过将 --consumer-group-id 实参设为 vkconfig 脚本的调度程序微批处理实用程序来设置使用者组。例如,假设您希望设置调度程序中显示的示例调度程序向名为 vertica-database 的使用者组报告其使用情况。那么,可以使用以下命令:

$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update \
    --conf weblog.conf --microbatch weblog --consumer-group-id vertica-database

当调度程序开始加载数据时,它将开始更新新的使用者组。使用 kafka-consumer-groups.sh 可以在 Kafka 节点上看到此新使用者组。

使用 --list 选项可返回使用者组:

$ /path/to/kafka/bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

vertica-database
vertica-vmart

使用 --describe--group 选项可返回有关特定使用者组的详细信息:

$ /path/to/kafka/bin/kafka-consumer-groups.sh --describe --group vertica-database \
                                          --bootstrap-server localhost:9092
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'vertica-database' has no active members.

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
web_hits                       0          30300           30300           0          -                                                 -                              -

手动加载时进行更改

要在手动加载数据时更改使用者组,请使用 KafkaSource 函数的 group_id 参数:

=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
                                    brokers='kafka01.example.com:9092',
                                    stop_on_eof=True,
                                    group_id='vertica_database')
                 PARSER KafkaJSONParser();
 Rows Loaded
-------------
       50000
(1 row)

加载消息时采用使用者组偏移量

可以选择让调度程序、手动加载或自定义加载脚本从使用者组的偏移量处开始加载消息。要从存储在使用者组中的最后偏移量处加载消息,请使用特殊的偏移量 -3。

使用调度程序自动加载的示例

要指示调度程序从使用者组已保存的偏移量处加载消息,请使用 vkconfig 脚本微批处理工具的 --offset 实参。

  1. 停止使用 shutdown 命令的调度程序以及用于创建调度程序的配置文件:

    $ /opt/vertica/packages/kafka/bin/vkconfig microbatch shutdown --conf weblog.conf
    
  2. 将微批处理 --offset 选项设为 -3:

    $ /opt/vertica/packages/kafka/bin/vkconfig microbatch --update --conf weblog.conf --microbatch weblog --offset -3
    

这会将调度程序从其读取的所有主题分区的偏移量设置为 -3。调度程序将采用使用者组已保存的偏移量开始下一次加载,并且所有后续加载均将使用保存在 stream_microbatch_history 中的偏移量。

手动加载示例

以下示例将从 web_hits 主题加载消息,该主题具有包含 51,000 条消息的分区。有关使用 KafkaSource 进行手动加载的详细信息,请参阅手动使用来自 Kafka 的数据

  1. 第一个 COPY 语句将创建名为 vertica_manual 的使用者组,并从 web_hits 主题中的第一个分区加载前 50,000 条消息:

    => COPY web_hits
       SOURCE KafkaSource(stream='web_hits|0|0|50000',
                                  brokers='kafka01.example.com:9092',
                                  stop_on_eof=True,
                                  group_id='vertica_manual')
       PARSER KafkaJSONParser()
       REJECTED DATA AS TABLE public.web_hits_rejections;
     Rows Loaded
    -------------
           50000
    (1 row)
    
  2. 下一个 COPY 语句将传递 -3 作为 start_offset 流参数,以便从使用者组已保存的偏移量处进行加载:

    => COPY web_hits
       SOURCE KafkaSource(stream='web_hits|0|-3',
                                  brokers='kafka01.example.com:9092',
                                  stop_on_eof=True,
                                  group_id='vertica_manual')
       PARSER KafkaJSONParser()
       REJECTED DATA AS TABLE public.web_hits_rejections;
     Rows Loaded
    -------------
            1000
    (1 row)
    

禁用使用者组报告

默认情况下,Vertica 会向 Kafka 报告其使用的消息的偏移量。如果没有专门为 Vertica 配置使用者组,它仍会向名为 vertica_database-name(其中 database- name 是 Vertica 当前正在运行的数据库的名称)的使用者组报告其偏移量。

如果要完全禁止 Vertica 向 Kafka 报告其使用情况,可以将使用者组设为空字符串或 NULL。例如:

=> COPY web_hits SOURCE KafkaSource(stream='web_hits|0|-2',
                                    brokers='kafka01.example.com:9092',
                                    stop_on_eof=True,
                                    group_id=NULL)
                 PARSER KafkaJsonParser();
 Rows Loaded
-------------
       60000
(1 row)

4.3.2 - 从 vkconfig 获取配置和统计信息

vkconfig 工具有两个可帮助检查调度程序的配置并监控数据加载情况的功能:

  • 配置调度程序(调度程序、群集、源、目标、加载规范和微批处理)的 vkconfig 工具具有 --read 实参,可让这些工具在调度程序中输出其当前设置。

  • vkconfig 统计信息工具可用于获取有关微批处理的统计信息。根据日期和时间范围、群集、分区及其他条件,可以筛选微批处理记录。

上述两个功能均以 JSON 格式输出数据。可以使用能够使用 JSON 数据的第三方工具或编写自己的脚本来处理配置和统计信息数据。

此外,还可以通过查询调度程序架构中的配置表来访问这些 vkconfig 选项提供的数据。但是,您可能会发现这些选项变得更加易于使用,因为它们不需要您连接到 Vertica 数据库。

获取配置信息

--read 选项传递到 vkconfig 的配置工具即可获取该工具可以设置的选项的当前设置。此输出采用 JSON 格式。以下示例演示了如何为 weblog.conf 配置文件中定义的调度程序从调度程序和群集工具中获取配置信息:

$ vkconfig scheduler --read --conf weblog.conf
{"version":"v9.2.0", "frame_duration":"00:00:10", "resource_pool":"weblog_pool",
 "config_refresh":"00:05:00", "new_source_policy":"FAIR",
 "pushback_policy":"LINEAR", "pushback_max_count":5, "auto_sync":true,
 "consumer_group_id":null}

$ vkconfig cluster --read --conf weblog.conf
{"cluster":"kafka_weblog", "hosts":"kafak01.example.com:9092,kafka02.example.com:9092"}

--read 选项将列出该工具在调度程序架构中创建的所有值。例如,如果已在调度程序中定义多个目标,则 --read 选项会列出所有目标。

$ vkconfig target --list --conf weblog.conf
{"target_schema":"public", "target_table":"health_data"}
{"target_schema":"public", "target_table":"iot_data"}
{"target_schema":"public", "target_table":"web_hits"}

可以使用 vkconfig 工具接受的其他实参来筛选 --read 选项输出。例如,在群集工具中,可以使用 --host 实参将输出限制为仅显示包含特定主机的群集。这些实参支持 LIKE 谓词通配符,因此可以匹配部分值。有关使用通配符的详细信息,请参阅 LIKE 谓词

以下示例演示了如何使用 --host 实参来筛选群集工具的 --read 选项的输出。第一次调用显示未经筛选的输出。第二次调用可筛选输出,以仅显示以“kafka”开头的群集:

$ vkconfig cluster --read --conf weblog.conf
{"cluster":"some_cluster", "hosts":"host01.example.com"}
{"cluster":"iot_cluster",
 "hosts":"kafka-iot01.example.com:9092,kafka-iot02.example.com:9092"}
{"cluster":"weblog",
 "hosts":"web01.example.com.com:9092,web02.example.com:9092"}
{"cluster":"streamcluster1",
 "hosts":"kafka-a-01.example.com:9092,kafka-a-02.example.com:9092"}
{"cluster":"test_cluster",
 "hosts":"test01.example.com:9092,test02.example.com:9092"}

$ vkconfig cluster --read --conf weblog.conf --hosts kafka%
{"cluster":"iot_cluster",
 "hosts":"kafka-iot01.example.com:9092,kafka-iot02.example.com:9092"}
{"cluster":"streamcluster1",
 "hosts":"kafka-a-01.example.com:9092,kafka-a-02.example.com:9092"}

有关详细信息,请参阅群集工具选项加载规范工具选项微批处理工具选项调度程序工具选项目标工具选项源工具选项

获取流式传输数据加载统计信息

vkconfig 脚本的统计信息工具可用于查看调度程序微批处理的历史记录。可以使用以下条件的任意组合来筛选结果:

  • 微批处理的名称

  • 作为数据加载源的 Kafka 群集

  • 主题的名称

  • 主题内的分区

  • 数据加载所针对的 Vertica 架构和表

  • 日期和时间范围

  • 最新的微批处理

有关此工具中提供的所有选项,请参阅统计信息工具选项

以下示例将获取调度程序运行的最后两个微批处理:

$ vkconfig statistics --last 2 --conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
 "source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
 "start_offset":73300, "end_offset":73399, "end_reason":"END_OF_STREAM",
 "end_reason_message":null, "partition_bytes":19588, "partition_messages":100,
 "timeslice":"00:00:09.807000", "batch_start":"2018-11-02 13:22:07.825295",
 "batch_end":"2018-11-02 13:22:08.135299", "source_duration":"00:00:00.219619",
 "consecutive_error_count":null, "transaction_id":45035996273976123,
 "frame_start":"2018-11-02 13:22:07.601", "frame_end":null}
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
 "source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
 "start_offset":73200, "end_offset":73299, "end_reason":"END_OF_STREAM",
 "end_reason_message":null, "partition_bytes":19781, "partition_messages":100,
 "timeslice":"00:00:09.561000", "batch_start":"2018-11-02 13:21:58.044698",
 "batch_end":"2018-11-02 13:21:58.335431", "source_duration":"00:00:00.214868",
 "consecutive_error_count":null, "transaction_id":45035996273976095,
 "frame_start":"2018-11-02 13:21:57.561", "frame_end":null}

以下示例将从名为 web_hits 的源获取介于 2018 年 11 月 2 日 13:21:00 到 13:21:20 之间的微批处理:

$ vkconfig statistics --source "web_hits" --from-timestamp \
           "2018-11-02 13:21:00" --to-timestamp "2018-11-02 13:21:20"  \
           --conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
 "source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
 "start_offset":72800, "end_offset":72899, "end_reason":"END_OF_STREAM",
 "end_reason_message":null, "partition_bytes":19989, "partition_messages":100,
 "timeslice":"00:00:09.778000", "batch_start":"2018-11-02 13:21:17.581606",
 "batch_end":"2018-11-02 13:21:18.850705", "source_duration":"00:00:01.215751",
 "consecutive_error_count":null, "transaction_id":45035996273975997,
 "frame_start":"2018-11-02 13:21:17.34", "frame_end":null}
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
 "source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
 "start_offset":72700, "end_offset":72799, "end_reason":"END_OF_STREAM",
 "end_reason_message":null, "partition_bytes":19640, "partition_messages":100,
 "timeslice":"00:00:09.857000", "batch_start":"2018-11-02 13:21:07.470834",
 "batch_end":"2018-11-02 13:21:08.737255", "source_duration":"00:00:01.218932",
 "consecutive_error_count":null, "transaction_id":45035996273975978,
 "frame_start":"2018-11-02 13:21:07.309", "frame_end":null}

有关使用此工具的更多示例,请参阅统计信息工具选项

4.4 - 解析自定义格式

要处理 Kafka 数据流,解析器必须识别每条消息之间的边界。Vertica 提供的 Kafka 解析器可以识别 AvroJSON原始数据格式的边界,但您的数据流可能会使用自定义格式。为解析自定义格式,Vertica 提供了筛选器,可在数据流到达解析器之前在数据流中插入边界信息。

Kafka 筛选器

Vertica 提供以下筛选器:

  • KafkaInsertDelimiters:在数据流中的每条消息之间插入用户指定的分隔符。该分隔符可以包含任意字符,具有任意长度。此解析器使用以下语法:

    KafkaInsertDelimiters(delimiter = 'delimiter')
  • KafkaInsertLengths:在消息的开头插入消息长度(以字节为单位)。Vertica 按 big-endian 网络字节顺序将长度写入为 4 字节 uint32 值。例如,100 字节的消息会加上 0x00000064 前缀。此解析器使用以下语法:

    KafkaInsertLengths()

除了使用其中一个 Kafka 筛选器之外,还可以在单个 COPY 语句中添加一个或多个用户定义的筛选器。请以逗号分隔的列表指定多个筛选器,并先列出 Vertica 筛选器。如果使用非 Kafka 解析器,则必须至少使用一个筛选器为解析器准备数据流,否则解析器将失败并返回错误。

示例

以下 COPY 语句将从名为 iot-data 的主题中的两个分区加载逗号分隔值。处理完两个分区中的所有消息后,加载将退出。KafkaInsertDelimiters 筛选器将在 Kafka 消息之间插入换行符,以将其转换为传统的数据行。该语句将使用标准 COPY 解析器以逗号分隔 CSV 值:

=> COPY kafka_iot SOURCE KafkaSource(stream='iot-data|0|-2,iot-data|1|-2',
                                     brokers='kafka01:9092',
                                     stop_on_eof=True)
                  FILTER KafkaInsertDelimiters(delimiter = E'\n')
                  DELIMITER ',';
 Rows Loaded
-------------
        3430
(1 row)

5 - 为 Kafka 生成数据

除了使用 Kafka 中的数据之外,Vertica 还可以为 Kafka 生成数据。从 Vertica 流式传输以下数据以供其他 Kafka 使用者使用:

  • Vertica 分析结果。使用 KafkaExport 导出 Vertica 表和查询。

  • 数据收集器表中的运行状况和性能数据。创建基于推送的通知程序,发送此数据供第三方监控工具使用。

  • 临时消息。使用 NOTIFY 表示存储过程等任务已完成。

5.1 - 使用 KafkaExport 生成数据

使用 KafkaExport 函数可以将数据从 Vertica 流式传输到 Kafka。您可以向此函数传递三个实参以及两个或三个参数:

SELECT KafkaExport(partitionColumn, keyColumn, valueColumn
    USING PARAMETERS brokers='host[:port][,host...]',
    topic='topicname'
    [,kafka_conf='kafka_configuration_setting']
    [,fail_on_conf_parse_error=Boolean])
OVER (partition_clause) FROM table;

partitionColumnkeyColumn 实参分别用于设置 Kafka 主题的分区和键值。您可以将这两个值中的任何一个或两个都设为 NULL。如果将分区设为 NULL,Kafka 将使用其默认分区架构(如果键值为 NULL,则随机分配分区,否则根据键值进行分配)。

valueColumn 实参是 LONG VARCHAR,其中包含要发送到 Kafka 的消息数据。Kafka 不会对消息内容强加结构。唯一能对消息格式添加的限制是指定数据使用者能够解析的内容。

您可以按照自己喜欢的任何方式自由地将数据转换为字符串。对于简单消息(如逗号分隔的列表),可以使用 CONCAT 等函数将各个值组合为消息。如果需要更复杂的数据格式(如 JSON),请考虑编写 UDx 函数,该函数接受数据列并以您所需的格式输出包含数据的 LONG VARCHAR。有关详细信息,请参阅开发用户定义的扩展 (UDx)

有关 KafkaExport 语法的详细信息,请参阅 KafkaExport

导出示例

以下示例为您显示了如何对表的多个列执行简单导出。假设您拥有以下包含一组简单的物联网 (IOT) 数据的表:

=> SELECT * FROM iot_report LIMIT 10;
 server |        date         |       location        |    id
--------+---------------------+-----------------------+----------
      1 | 2016-10-11 04:09:28 | -14.86058, 112.75848  | 70982027
      1 | 2017-07-02 12:37:48 | -21.42197, -127.17672 | 49494918
      1 | 2017-10-19 14:04:33 | -71.72156, -36.27381  | 94328189
      1 | 2018-07-11 19:35:18 | -9.41315, 102.36866   | 48366610
      1 | 2018-08-30 08:09:45 | 83.97962, 162.83848   |   967212
      2 | 2017-01-20 03:05:24 | 37.17372, 136.14026   | 36670100
      2 | 2017-07-29 11:38:37 | -38.99517, 171.72671  | 52049116
      2 | 2018-04-19 13:06:58 | 69.28989, 133.98275   | 36059026
      2 | 2018-08-28 01:09:51 | -59.71784, -144.97142 | 77310139
      2 | 2018-09-14 23:16:15 | 58.07275, 111.07354   |  4198109
(10 rows)

=> \d iot_report
                                       List of Fields by Tables
 Schema |   Table    |  Column  |    Type     | Size | Default | Not Null | Primary Key | Foreign Key
--------+------------+----------+-------------+------+---------+----------+-------------+-------------
 public | iot_report | server   | int         |    8 |         | f        | f           |
 public | iot_report | date     | timestamp   |    8 |         | f        | f           |
 public | iot_report | location | varchar(40) |   40 |         | f        | f           |
 public | iot_report | id       | int         |    8 |         | f        | f           |
(4 rows)

您希望将此表中的数据发送到名为 iot_results 的 Kafka 主题以供其他应用程序使用。查看相应数据和 iot_report 的结构后,您可能会做出以下决定:

  • server 列与 iot_report 中的分区非常匹配。Kafka 主题中有三个分区,server 列中的值介于 1 和 3 之间。假设分区列的值范围更大(例如,介于 1 和 100 之间)。那么您就可以使用取模运算符 (%) 将这些值强制转换为与分区数相同的范围 (server % 3)。
    这些值的复杂之处在于 server 列中的值从 1 开始(该列中的最小值为 1)。Kafka 的分区编号架构从零开始。因此,调整 server 列中的值时必须减去 1。

  • id 列可以充当键。此列的数据类型为 INTEGER。KafkaExport 函数的键值应为 VARCHAR。Vertica 不会自动将 INTEGER 值转换为 VARCHAR,因此您必须在函数调用中显式转换该值。

  • iot_report 主题的使用者希望值采用逗号分隔格式。您可以使用对 CONCAT 函数的嵌套调用将 date 和 location 列中的值合并到一个 VARCHAR 中。

您需要了解的最后一条信息是 Kafka 群集中代理的主机名和端口号。在此示例中,有两个名为 kafka01 和 kafka03 的代理在端口 6667(Hortonworks 群集使用的端口)上运行。获取所有这些信息后,即可随时导出数据。

以下示例显示了如何导出 iot_report 的内容:

=> SELECT KafkaExport(server - 1, id::VARCHAR,
   CONCAT(CONCAT(date, ', '), location)
   USING PARAMETERS brokers='kafka01:6667,kafka03:6667',
   topic='iot_results') OVER (PARTITION BEST) FROM iot_report;
 partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)

KafkaExport 返回了 0 行,这意味着 Vertica 能够将您的所有数据发送到 Kafka,而不会出现任何错误。

关于此示例的其他注意事项:

  • CONCAT 函数会自动为您将 data 列的 DATETIME 值转换为 VARCHAR,以便您无需对其进行显式转换。

  • 需要使用两个嵌套的 CONCAT 函数将 date 字段与逗号连接起来,然后将生成的字符串与 location 字段连接起来。

  • 向 message 字段添加第三列需要再进行两次 CONCAT 函数调用(一次用于在 location 列后连接逗号,一次用于连接其他列的值)。在加载相当于几列的数据之后,使用 CONCAT 将变得棘手。

在 Kafka 端,您将看到作为 KafkaExport 函数的 valueColumn(第三个)实参发送的任何内容。在上面的示例中,此内容为 CSV 列表。如果在运行示例查询之前已为 iot_results 主题启动控制台使用者,您将在查询运行时看到以下输出:

$ /opt/kafka/bin/kafka-console-consumer.sh --topic iot_results --zookeeper localhost
2017-10-10 12:08:33, 78.84883, -137.56584
2017-12-06 16:50:57, -25.33024, -157.91389
2018-01-12 21:27:39, 82.34027, 116.66703
2018-08-19 00:02:18, 13.00436, 85.44815
2016-10-11 04:09:28, -14.86058, 112.75848
2017-07-02 12:37:48, -21.42197, -127.17672
2017-10-19 14:04:33, -71.72156, -36.27381
2018-07-11 19:35:18, -9.41315, 102.36866
2018-08-30 08:09:45, 83.97962, 162.83848
2017-01-20 03:05:24, 37.17372, 136.14026
2017-07-29 11:38:37, -38.99517, 171.72671
2018-04-19 13:06:58, 69.28989, 133.98275
2018-08-28 01:09:51, -59.71784, -144.97142
2018-09-14 23:16:15, 58.07275, 111.07354

KafkaExport 的返回值

KafkaExport 将输出 Kafka 已拒绝的所有行。例如,假设您在前面的示例中忘记将分区列调整为从零开始。则导出到 Kafka 的某些行会指定一个不存在的分区。在这种情况下,Kafka 会拒绝这些行,并且 KafkaExport 会以表格式报告它们:

=> SELECT KafkaExport(server, id::VARCHAR,
   CONCAT(CONCAT(date, ', '), location)
   USING PARAMETERS brokers='kafka01:6667,kafka03:6667',
   topic='iot_results') OVER (PARTITION BEST) FROM iot_report;
 partition |   key    |                  message                    |      failure_reason
-----------+----------+---------------------------------------------+--------------------------
         3 | 40492866 | 2017-10-10 12:08:33, 78.84883, -137.56584,  | Local: Unknown partition
         3 | 73846006 | 2017-12-06 16:50:57, -25.33024, -157.91389, | Local: Unknown partition
         3 | 45020829 | 2018-01-12 21:27:39, 82.34027, 116.66703,   | Local: Unknown partition
         3 | 27462612 | 2018-08-19 00:02:18, 13.00436, 85.44815,    | Local: Unknown partition
(4 rows)

您可以通过创建表保存拒绝的行来捕获此输出。然后,使用 INSERT 语句插入 KafkaExport 的结果:

=> CREATE TABLE export_rejects (partition INTEGER, key VARCHAR, message LONG VARCHAR, failure_reason VARCHAR);
CREATE TABLE
=> INSERT INTO export_rejects SELECT KafkaExport(server, id::VARCHAR,
   CONCAT(CONCAT(date, ', '), location)
   USING PARAMETERS brokers='kafka01:6667,kafka03:6667',
   topic='iot_results') OVER (PARTITION BEST) FROM iot_report;
 OUTPUT
--------
      4
(1 row)
=> SELECT * FROM export_rejects;
 partition |   key    |                  message                   |      failure_reason
-----------+----------+--------------------------------------------+--------------------------
         3 | 27462612 | 2018-08-19 00:02:18, 13.00436, 85.44815    | Local: Unknown partition
         3 | 40492866 | 2017-10-10 12:08:33, 78.84883, -137.56584  | Local: Unknown partition
         3 | 73846006 | 2017-12-06 16:50:57, -25.33024, -157.91389 | Local: Unknown partition
         3 | 45020829 | 2018-01-12 21:27:39, 82.34027, 116.66703   | Local: Unknown partition
(4 rows)

5.2 - 使用通知程序生成 Kafka 消息

可以使用通知程序帮助您监控 Vertica 数据库,使用第三方 Kafka 感知工具向 Kafka 主题生成消息。可以直接发布消息(例如,通过 SQL 脚本指示长时间运行的查询已完成)。通知程序还可以在数据收集器表中的组件更新时自动发送消息。

5.2.1 - 创建 Kafka 通知程序

通过以下过程创建 Kafka 通知程序。通知程序至少定义:

  • 一个独特的名称。

  • 一种消息协议。向 Kafka 发送消息时,它是 kafka://

  • 要与之通信的服务器。对于 Kafka,它是 Kafka 代理的地址和端口号。

  • 最大消息缓冲区大小。如果要通过通知程序发送的消息队列超过此限制,则丢弃消息。

使用 CREATE NOTIFIER 创建通知程序。以下示例创建一个名为 load_progress_notifier 的通知程序,该通知程序通过运行于端口 9092 上的 kafka01.example.com 的 Kafka 代理发送消息:

=> CREATE NOTIFIER load_progress_notifier
    ACTION 'kafka://kafka01.example.com:9092'
    MAXMEMORYSIZE '10M';

虽然不是必需项,但最佳实践是创建使用加密连接的通知程序。以下示例创建一个使用加密连接的通知程序,并使用提供的 CA 捆绑包验证 Kafka 服务器的证书:

=> CREATE NOTIFIER encrypted_notifier
    ACTION 'kafka://127.0.0.1:9092'
    MAXMEMORYSIZE '10M'
    TLSMODE 'verify-ca'
    CA BUNDLE ca_bundle;

按照此步骤,为使用 SASL_SSL 的 Kafka 端点创建或更改通知程序。请注意,每当您更改给定通知程序的 TLSMODE、证书或 CA 捆绑包时,都必须重复此步骤。

  1. 在设置 TLSMODE、证书和 CA 捆绑包时,使用 CREATE 或 ALTER 以禁用通知程序。

    => ALTER NOTIFIER encrypted_notifier
        DISABLE
        TLSMODE 'verify-ca'
        CA BUNDLE ca_bundle2;
    
  2. 更改通知程序并为 SASL_SSL 设置适合的 rdkafka 适配器参数。

    => ALTER NOTIFIER encrypted_notifier PARAMETERS
      'sasl.username=user;sasl.password=password;sasl.mechanism=PLAIN;security.protocol=SASL_SSL';
    
  3. 启用通知程序。

    => ALTER NOTIFIER encrypted_notifier ENABLE;
    

5.2.2 - 通过 Kafka 通知程序发送单个消息

可以使用 NOTIFY 函数通过 Kafka 通知程序发送单个消息。此功能对于向第三方报告工具报告 SQL 脚本(如 ETL 任务)的进度非常有用。

向此函数传递三个字符串值:

  • 要发送的消息。

  • 要发送消息的通知程序的名称。

  • 要接收消息的 Kafka 主题。

例如,假定要向前面创建的 load_progress_notifier 通知程序中定义的 Kafka 群集的 vertica_notifications 主题发送消息“每日加载完成 (Daily load finished)”。那么可以执行以下语句:

=> SELECT NOTIFY('Daily load finished.',
                 'load_progress_notifier',
                 'vertica_notifications');
 NOTIFY
--------
 OK
(1 row)

通知程序发送给 Kafka 的消息采用 JSON 格式。可以使用 Kafka 节点上的控制台使用者查看生成的消息。例如:

$ /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
                                           --from-beginning \
                                           --topic vertica_notifications \
                                           --max-messages 1

{"_db":"vmart","_schema":"v_internal","_table":"dc_notifications",
"channel":"vertica_notifications","message":"Daily load finished.",
"node_name":"v_vmart_node0001","notifier":"load_progress_notifier",
"request_id":2,"session_id":"v_vmart_node0001-463079:0x4ba6f",
"statement_id":-1,"time":"2018-06-19 09:48:42.314181-04",
"transaction_id":45035996275565458,"user_id":45035996273704962,
"user_name":"dbadmin"}

Processed a total of 1 messages

5.2.3 - 使用 Kafka 通知程序监控 DC 表

Vertica 数据收集器 (DC) 表监控许多不同的数据库功能。当 DC 组件更新时,可以让通知程序自动向 Kafka 端点发送消息。可以查询 DATA_COLLECTOR 表以获取 DC 组件列表。

使用函数 SET_DATA_COLLECTOR_NOTIFY_POLICY 配置通知程序向 Kafka 发送 DC 组件更新。

要在登录尝试失败时收到通知,您可以创建一个通知程序,使其在 DC 组件 LoginFailures 更新时发送通知。TLSMODE 'verify-ca’ 将验证服务器的证书是否由受信任的 CA 签名。

=> CREATE NOTIFIER vertica_stats ACTION 'kafka://kafka01.example.com:9092' MAXMEMORYSIZE '10M' TLSMODE 'verify-ca';
CREATE NOTIFIER
=> SELECT SET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures','vertica_stats', 'vertica_notifications', true);
SET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------
 SET
(1 row)

与通过 NOTIFY 函数发送的消息一样,从 DC 组件发送到 Kafka 的数据采用 JSON 格式。前面的示例导致将如下消息发送到 vertica_notifications Kafka 主题:

{"_db":"vmart","_schema":"v_internal","_table":"dc_login_failures",
"authentication_method":"Reject","client_authentication_name":"",
"client_hostname":"::1","client_label":"","client_os_user_name":"dbadmin",
"client_pid":481535,"client_version":"","database_name":"alice",
"effective_protocol":"3.8","node_name":"v_vmart_node0001",
"reason":"INVALID USER","requested_protocol":"3.8","ssl_client_fingerprint":"",
"time":"2018-06-19 14:51:22.437035-04","user_name":"alice"}

查看 DC 组件的通知策略

使用 GET_DATA_COLLECTOR_NOTIFY_POLICY 函数列出为 DC 组件设置的策略。

=> SELECT GET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures');
                   GET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------------------------------------------
 Notifiable;  Notifier: vertica_stats; Channel: vertica_notifications
(1 row)

禁用通知策略

可以调用 SET_DATA_COLLECTOR_NOTIFY_POLICY 函数并将其第四个实参设置为 FALSE,以此来禁用通知策略。以下示例禁用 LoginFailures 组件的通知策略:

=> SELECT SET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures','vertica_stats', 'vertica_notifications', false);
 SET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------
 SET
(1 row)

=> SELECT GET_DATA_COLLECTOR_NOTIFY_POLICY('LoginFailures');
 GET_DATA_COLLECTOR_NOTIFY_POLICY
----------------------------------
 Not notifiable;
(1 row)

6 - 使用 Kafka 进行 TLS/SSL 加密

您可以在 Vertica、调度程序和 Kakfa 之间使用 TLS/SSL 加密。此加密可防止其他人访问在 Kafka 和 Vertica 之间发送的数据。此外,还可以验证数据流式传输过程中涉及的所有参与方的身份,以便没有冒充者可以冒充您的 Vertica 群集或 Kafka 代理。

在以下一些常见情况下,您会想要在 Vertica 与 Kafka 之间使用 SSL 加密:

  • 您的 Vertica 数据库和 Kafka 通过不安全的网络进行通信。例如,假设 Kafka 群集位于云服务中,而 Vertica 群集位于您的内部网络中。在这种情况下,从 Kafka 读取的任何数据都会通过不安全的连接在 Internet 上传输。

  • 根据安全策略、法律或其他要求,您需要对所有网络流量进行加密。

有关 Vertica 中 TLS/SSL 加密的详细信息,请参阅 TLS 协议

在调度程序与 Vertica 之间使用 TLS/SSL

调度程序连接到 Vertica 的方式与其他客户端应用程序连接到 Vertica 的方式相同。您可以通过两种方式将 Vertica 配置为对客户端使用 SSL/TLS 身份验证和加密:

  • 如果将 Vertica 配置为使用 SSL/TLS 服务器身份验证,则可以选择让调度程序确认 Vertica 服务器的身份。

  • 如果将 Vertica 配置为使用相互 SSL/TLS 身份验证,则可以将调度程序配置为向 Vertica 表明自己的身份,以及让它验证 Vertica 服务器的身份。根据您的数据库配置,Vertica 服务器可能会要求调度程序在连接时使用 TLS。有关详细信息,请参阅使用 TLS 进行客户端身份验证

有关与 Vertica 建立的加密客户端连接的信息,请参考 TLS 协议

调度程序在 Java 虚拟机 (JVM) 上运行,并使用 JDBC 连接到 Vertica。连接到 Vertica 时,它的行为与任何其他 JDBC 客户端一样。要对调度程序与 Vertica 的连接使用 TLS/SSL 加密,请使用 Java 密钥库和信任库机制来保存调度程序用于表明自己的身份和 Vertica 身份的密钥和证书。

  • 密钥库包含调度程序的私有加密密钥及其证书(公钥)。

  • 信任库包含您信任的 CA。如果启用身份验证,调度程序将使用这些 CA 来验证其连接到的 Vertica 群集的身份。如果已使用信任库中的某个 CA 对服务器的证书进行签名,则调度程序知道它可以信任 Vertica 服务器的身份。

您可以通过名为 VKCONFIG_JVM_OPTS 的 Linux 环境变量将各个选项传递给执行调度程序的 JVM。向修改调度程序的 JDBC 设置(例如,调度程序的 JDBC 连接的信任库和密钥库)的此变量添加参数。请参阅步骤 2:设置 VKCONFIG_JVM_OPTS 环境变量以查看相关示例。

此外,还可以使用 --jdbc-url 调度程序选项来修改 JDBC 配置。请参阅常用 vkconfig 脚本选项以了解有关调度程序选项的详细信息,并参阅 JDBC 连接属性以了解有关其可以修改的属性的详细信息。

在 Vertica 与 Kafka 之间使用 TLS/SSL

您可以通过两种方式将数据从 Kafka 流式传输到 Vertica:手动使用 COPY 语句和 KafkaSource UD 源函数,或自动使用调度程序。

要通过 SSL 连接直接从 Kafka 复制数据,需要设置包含 SSL 密钥和证书的会话变量。当 KafkaSource 发现您已设置这些变量时,它会使用该密钥和证书来创建与 Kafka 的安全连接。请参阅 Kafka TLS-SSL 示例第 4 部分:直接从 Kafka 加载数据以了解详细信息。

自动将数据从 Kafka 流式传输到 Vertica 时,配置调度程序的方式与使用 SSL 连接到 Vertica 的方式相同。当调度程序执行 COPY 语句以从 Kafka 加载数据时,它会使用自己的密钥库和信任库来创建与 Kafka 的 SSL 连接。

要在从 Vertica 向 Kafka 生成数据时使用 SSL 连接,需要设置通过 SSL 连接直接从 Kafka 流式传输数据时使用的同一会话变量。KafkaExport 函数会使用这些变量来建立与 Kafka 的安全连接。

有关对 Kafka 使用 SSL/TLS 身份验证的详细信息,请参阅 Apache Kafka 文档

6.1 - 计划在 Vertica 和 Kafka 之间进行 TLS/SSL 加密

开始配置 TLS/SSL 之前需注意的一些事项:

  • 需要对调度程序、Vertica 和 Kafka 之间的哪些连接加密?在某些情况下,可能只需在 Vertica 与 Kafka 之间启用加密。当 Vertica 和 Kafka 群集位于不同的网络中时,这种情况很常见。例如,假设 Kafka 托管在云提供商处,而 Vertica 托管在您的内部网络中。那么,数据在两者之间必须穿过不安全的 Internet。但是,如果 Vertica 和调度程序都位于本地网络中,则可以断定不需要将其配置为使用 SSL/TLS。在其他情况下,需要对系统的所有部分进行加密。例如,当 Kafka、Vertica 和调度程序都托管在网络可能不安全的云提供商处时,需要对所有流量进行加密。

  • 需要信任调度程序、Vertica 和 Kafka 之间的哪些连接?如果一个系统无法验证另一个系统的身份,则可以选择将其中的任意连接设为失败。请参阅下面的验证身份

  • 您将使用哪些 CA 对每个证书进行签名?配置调度程序、Kafka 和 Vertica 的最简单方法是使用同一 CA 对设置 TLS/SSL 时将使用的所有证书进行签名。使用同一根 CA 对证书进行签名时,需要能够编辑 Kafka、Vertica 和调度程序的配置。如果无法使用同一 CA 对所有证书进行签名,则所有信任库必须包含用于对证书进行签名的整个 CA 链(直到根 CA)。添加整个信任链可确保每个系统都可以验证彼此的身份。

验证身份

在 Vertica、调度程序和 Kafka 之间配置 TLS/SSL 加密时,主要挑战是确保调度程序、Kafka 和 Vertica 都可以验证彼此的身份。设置 TLS/SSL 加密时用户最常遇到的问题是,如何确保远程系统可以验证系统证书的真实性。防止出现此问题的最佳方法是,确保所有系统的证书均由所有系统明确信任的 CA 进行签名。

当一个系统尝试启动与另一个系统的加密连接时,它会将其证书发送到远程系统。此证书可由一个或多个帮助识别建立连接的系统的证书颁发机构 (CA) 进行签名。这些签名形成了“信任链”。一个证书由一个 CA 进行签名。反过来,该 CA 可能已由另一个 CA 进行签名,依此类推。通常,该链以来自知名商业证书提供商(例如 Comodo SSL 或 DigiCert)的 CA(称为根 CA)结束,默认情况下,这些证书在许多平台(例如操作系统和 Web 浏览器)上均受信任。

如果远程系统找到其信任的链中的某个 CA,则会验证建立连接的系统的身份,然后就可以继续建立连接。如果远程系统找不到其信任的 CA 的签名,则可能会阻止连接,具体取决于其配置。可以将系统配置为仅允许与身份已经过验证的系统建立的连接。

6.2 - 为调度程序配置 TLS 连接

调度程序可以将 TLS 用于两种不同的连接:一种是它与 Vertica 建立的连接,另一种是运行 COPY 语句从 Kafka 检索数据时建立的连接。由于调度程序是 Java 应用程序,因此需要提供 TLS 密钥以及用于在密钥库中对调度程序进行签名的证书。此外,还需要提供信任库,其中包含调度程序应信任的证书。与 Vertica 和 Kafka 建立的连接都可以使用相同的密钥库和信任库。您还可以通过为调度程序设置不同的 JDBC 设置,选择为这两个连接使用单独的密钥库和信任库。有关这些设置的列表,请参阅 JDBC 连接属性

请参阅 Kafka TLS-SSL 示例第 5 部分:配置调度程序以了解件将调度程序配置为使用 SSL 的详细步骤。

请注意,如果将 Kafka 服务器的参数 client.ssl.auth 设置为 nonerequested,则不需要创建密钥库。

6.3 - 直接从 Kafka 加载数据时使用 TLS/SSL

您可以使用 COPY 语句和 KafkaSource 用户定义的加载函数从 Kafka 手动加载数据(请参阅手动使用来自 Kafka 的数据)。要让 KafkaSource 打开与 Kafka 的安全连接,必须为它提供 SSL 密钥和其他信息。

启动时,KafkaSource 函数会检查是否已定义多个用户会话变量。这些变量包含 SSL 密钥、用于对密钥进行签名的证书,以及该函数创建 SSL 连接所需的其他信息。有关这些变量的列表,请参阅 Kafka 用户定义的会话参数。如果 KafkaSource 发现已定义这些变量,则会使用它们创建与 Kafka 的 SSL 连接。

请参阅 Kafka TLS-SSL 示例第 4 部分:直接从 Kafka 加载数据,以获取有关在直接从 Kafka 复制数据时如何配置和使用 SSL 连接的逐步指南。

导出数据时,KafkaExport 函数也会使用这些变量来建立与 Kafka 的安全连接。

6.4 - 为 Kafka 配置 TLS

此页面介绍为 Vertica、Kafka 和调度程序配置 TLS 连接的过程。

请注意,以下示例会为 ssl.client.auth=required 的 Kafka 服务器配置 TLS,该操作需要以下各项:

  • kafka_SSL_Certificate

  • kafka_SSL_PrivateKey_secret

  • kafka_SSL_PrivateKeyPassword_secret

  • 调度程序的密钥库

如果您的配置使用 ssl.client.auth=nonessl.client.auth=requested,则这些参数和调度程序密钥库为可选项。

为 Vertica 和客户端创建证书

以下示例中的 CA 证书为自签名证书。在生产环境中,应该为使用可信 CA。

以下示例将使用同一自签名根 CA 对调度程序、Kafka 代理和 Vertica 使用的所有证书进行签名。如果不能使用同一 CA 对所有这些系统的密钥进行签名,请确保将整个信任链包含在密钥库中。

有关详细信息,请参阅生成 TLS 证书和密钥

  1. 生成私钥 root.key

    $ openssl genrsa -out root.key
    Generating RSA private key, 2048 bit long modulus
    ..............................................................................
    ............................+++
    ...............+++
    e is 65537 (0x10001)
    
  2. 生成自签名 CA 证书。

    $ openssl req -new -x509 -key root.key -out root.crt
    You are about to be asked to enter information that will be incorporated
    into your certificate request.
    What you are about to enter is what is called a Distinguished Name or a DN.
    There are quite a few fields but you can leave some blank
    For some fields there will be a default value,
    If you enter '.', the field will be left blank.
    -----
    Country Name (2 letter code) [AU]:US
    State or Province Name (full name) [Some-State]:MA
    Locality Name (eg, city) []:Cambridge
    Organization Name (eg, company) [Internet Widgits Pty Ltd]:My Company
    Organizational Unit Name (eg, section) []:
    Common Name (e.g. server FQDN or YOUR name) []:*.mycompany.com
    Email Address []:myemail@mycompany.com
    
  3. 限制所有者对 root.keyroot.crt 的读取/写入权限。向其他组授予对 root.crt 的读取权限。

    
    $ ls
    root.crt  root.key
    $ chmod 600 root.key
    $ chmod 644 root.crt
    
  4. 生成服务器私钥 server.key

    $ openssl genrsa -out server.key
    Generating RSA private key, 2048 bit long modulus
    ....................................................................+++
    ......................................+++
    e is 65537 (0x10001)
    
  5. 为您的 CA 创建证书签名请求 (CSR)。请务必将“公用名称 (Common Name)”设置为通配符(星号),以便群集中的所有 Vertica 节点都接受证书:

    $ openssl req -new -key server.key -out server_reqout.txt
    You are about to be asked to enter information that will be incorporated
    into your certificate request.
    What you are about to enter is what is called a Distinguished Name or a DN.
    There are quite a few fields but you can leave some blank
    For some fields there will be a default value,
    If you enter '.', the field will be left blank.
    -----
    Country Name (2 letter code) [AU]:US
    State or Province Name (full name) [Some-State]:MA
    Locality Name (eg, city) []:Cambridge
    Organization Name (eg, company) [Internet Widgits Pty Ltd]:My Company
    Organizational Unit Name (eg, section) []:
    Common Name (e.g. server FQDN or YOUR name) []:*.mycompany.com
    Email Address []:myemail@mycompany.com
    
    Please enter the following 'extra' attributes
    to be sent with your certificate request
    A challenge password []: server_key_password
    An optional company name []:
    
  6. 使用您的 CA 对服务器证书进行签名。这将创建服务器证书 server.crt

    $ openssl x509 -req -in server_reqout.txt -days 3650 -sha1 -CAcreateserial -CA root.crt \
        -CAkey root.key -out server.crt
        Signature ok
        subject=/C=US/ST=MA/L=Cambridge/O=My Company/CN=*.mycompany.com/emailAddress=myemail@mycompany.com
        Getting CA Private Key
    
  7. 为密钥和证书设置相应的权限。

    $ chmod 600 server.key
    $ chmod 644 server.crt
    

创建客户端密钥和证书(仅限相互模式)

相互模式 下,客户端和服务器会在建立连接之前会验证彼此的证书。以下过程会创建客户端密钥和证书以提供给 Vertica。证书必须由 Vertica 信任的 CA 进行签名。

此操作的步骤与上述为 Vertica 创建服务器密钥和证书的步骤相同。

$ openssl genrsa -out client.key
Generating RSA private key, 2048 bit long modulus
................................................................+++
..............................+++
e is 65537 (0x10001)

$ openssl req -new -key client.key -out client_reqout.txt
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [AU]:US
State or Province Name (full name) [Some-State]:MA
Locality Name (eg, city) []:Cambridge
Organization Name (eg, company) [Internet Widgits Pty Ltd]:My Company
Organizational Unit Name (eg, section) []:
Common Name (e.g. server FQDN or YOUR name) []:*.mycompany.com
Email Address []:myemail@mycompany.com

Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []: server_key_password
An optional company name []:

$ openssl x509 -req -in client_reqout.txt -days 3650 -sha1 -CAcreateserial -CA root.crt \
  -CAkey root.key -out client.crt
Signature ok
subject=/C=US/ST=MA/L=Cambridge/O=My Company/CN=*.mycompany.com/emailAddress=myemail@mycompany.com
Getting CA Private Key

$ chmod 600 client.key
$ chmod 644 client.crt

设置相互模式客户端-服务器 TLS

为相互模式配置 Vertica

对于相互模式,必须导入以下密钥和证书,然后使用 TLS CONFIGURATION 将其分发到 Vertica 群集上的节点:

  • root.key

  • root.crt

  • server.key

  • server.crt

您可以通过查询 CRYPTOGRAPHIC_KEYSCERTIFICATES 来查看现有的密钥和证书。

  1. 使用 CREATE KEYCREATE CERTIFICATE 将服务器和根密钥及证书导入到 Vertica 中。有关详细信息,请参阅生成 TLS 证书和密钥

    => CREATE KEY imported_key TYPE 'RSA' AS '-----BEGIN PRIVATE KEY-----...-----END PRIVATE KEY-----';
    => CREATE CA CERTIFICATE imported_ca AS '-----BEGIN CERTIFICATE-----...-----END CERTIFICATE-----';
    => CREATE CERTIFICATE imported_cert AS '-----BEGIN CERTIFICATE-----...-----END CERTIFICATE-----';
    

    在以下示例中,\set 用于检索 root.keyroot.crtserver.keyserver.crt 的内容。

    => \set ca_cert ''''`cat root.crt`''''
    => \set serv_key ''''`cat server.key`''''
    => \set serv_cert ''''`cat server.crt`''''
    
    => CREATE CA CERTIFICATE root_ca AS :ca_cert;
    CREATE CERTIFICATE
    => CREATE KEY server_key TYPE 'RSA' AS :serv_key;
    CREATE KEY
    => CREATE CERTIFICATE server_cert AS :serv_cert;
    CREATE CERTIFICATE
    
  2. 按照配置客户端-服务器 TLS 中针对相互模式 的步骤设置合适的 TLSMODE 和 TLS CONFIGURATION 参数。

为相互模式配置客户端

客户端必须有自己的私钥、证书和 CA 证书。建立连接时,证书将提供给 Vertica,并将使用 CA 证书来验证 Vertica 中的服务器证书。

以下示例为相互模式配置 vsql 客户端。

  1. 在用户的主目录中创建 .vsql 目录。

    $ mkdir ~/.vsql
    
  2. client.keyclient.crtroot.crt 复制到 vsql 目录。

    $ cp client.key client.crt root.crt ~/.vsql
    
  3. 使用 vsql 登录到 Vertica 并查询 SESSIONS 系统表,以验证连接是否使用的是相互模式:

    $ vsql
    Password: user-password
    Welcome to vsql, the Vertica Analytic Database interactive terminal.
    
    Type:  \h or \? for help with vsql commands
           \g or terminate with semicolon to execute query
           \q to quit
    
    SSL connection (cipher: DHE-RSA-AES256-GCM-SHA384, bits: 256, protocol: TLSv1.2)
    
    => select user_name,ssl_state from sessions;
     user_name | ssl_state
    -----------+-----------
     dbadmin   | Mutual
    (1 row)
    

为 Kafka 配置 TLS

配置 Kafka 代理

以下过程会将 Kafka 配置为对客户端连接使用 TLS。此外,还可以将 Kafka 配置为使用 TLS 在代理之间进行通信。但是,代理间的 TLS 对在 Vertica 与 Kafka 之间建立加密连接没有影响。

  1. 为所有 Kafka 代理创建信任库文件,从而导入 CA 证书。以下示例将使用上面创建的自签名 root.crt

    => $ keytool -keystore kafka.truststore.jks -alias CARoot -import \
                   -file root.crt
    Enter keystore password: some_password
    Re-enter new password: some_password
    Owner: EMAILADDRESS=myemail@mycompany.com, CN=*.mycompany.com, O=MyCompany, L=Cambridge, ST=MA, C=US
    Issuer: EMAILADDRESS=myemail@mycompany.com, CN=*.mycompany.com, O=MyCompany, L=Cambridge, ST=MA, C=US
    Serial number: c3f02e87707d01aa
    Valid from: Fri Mar 22 13:37:37 EDT 2019 until: Sun Apr 21 13:37:37 EDT 2019
    Certificate fingerprints:
             MD5:  73:B1:87:87:7B:FE:F1:6E:94:55:FD:AF:5D:D0:C3:0C
             SHA1: C0:69:1C:93:54:21:87:C7:03:93:FE:39:45:66:DE:22:18:7E:CD:94
             SHA256: 23:03:BB:B7:10:12:50:D9:C5:D0:B7:58:97:41:1E:0F:25:A0:DB:
                     D0:1E:7D:F9:6E:60:8F:79:A6:1C:3F:DD:D5
    Signature algorithm name: SHA256withRSA
    Subject Public Key Algorithm: 2048-bit RSA key
    Version: 3
    
    Extensions:
    
    #1: ObjectId: 2.5.29.35 Criticality=false
    AuthorityKeyIdentifier [
    KeyIdentifier [
    0000: 50 69 11 64 45 E9 CC C5   09 EE 26 B5 3E 71 39 7C  Pi.dE.....&.>q9.
    0010: E5 3D 78 16                                        .=x.
    ]
    ]
    
    #2: ObjectId: 2.5.29.19 Criticality=false
    BasicConstraints:[
      CA:true
      PathLen:2147483647
    ]
    
    #3: ObjectId: 2.5.29.14 Criticality=false
    SubjectKeyIdentifier [
    KeyIdentifier [
    0000: 50 69 11 64 45 E9 CC C5   09 EE 26 B5 3E 71 39 7C  Pi.dE.....&.>q9.
    0010: E5 3D 78 16                                        .=x.
    ]
    ]
    
    Trust this certificate? [no]:  yes
    Certificate was added to keystore
    
  2. 为名为 kafka01 的 Kafka 代理创建密钥库文件。每个代理的密钥库都应该唯一。

    keytool 命令会添加在建立 TLS 连接时用作备用名称的主题备用名称 (SAN)。使用 Kafka 代理的完全限定域名 (FQDN) 作为 SAN 和“您的名字和姓氏是什么? (What is your first and last name?)”提示问题的值。

    在以下示例中,FQDN 为 kafka01.example.comkeytool 的别名将设置为 localhost,因此与代理的本地连接将使用 TLS。

    $ keytool -keystore kafka01.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA \
          -ext SAN=DNS:kafka01.mycompany.com
    Enter keystore password: some_password
    Re-enter new password: some_password
    What is your first and last name?
      [Unknown]:  kafka01.mycompany.com
    What is the name of your organizational unit?
      [Unknown]:
    What is the name of your organization?
      [Unknown]: MyCompany
    What is the name of your City or Locality?
      [Unknown]:  Cambridge
    What is the name of your State or Province?
      [Unknown]:  MA
    What is the two-letter country code for this unit?
      [Unknown]:  US
    Is CN=Database Admin, OU=MyCompany, O=Unknown, L=Cambridge, ST=MA, C=US correct?
      [no]:  yes
    
    Enter key password for <localhost>
            (RETURN if same as keystore password):
    
  3. 导出 Kafka 代理的证书。在以下示例中,证书将导出为 kafka01.unsigned.crt

    $ keytool -keystore kafka01.keystore.jks -alias localhost \
                    -certreq -file kafka01.unsigned.crt
     Enter keystore password: some_password
    
  4. 使用 CA 证书对代理的证书进行签名。

    $ openssl x509 -req -CA root.crt -CAkey root.key -in kafka01.unsigned.crt \
                 -out kafka01.signed.crt -days 365 -CAcreateserial
    Signature ok
    subject=/C=US/ST=MA/L=Cambridge/O=Unknown/OU=MyCompany/CN=Database Admin
    Getting CA Private Key
    
  5. 将 CA 证书导入到代理的密钥库中。

    $ keytool -keystore kafka01.keystore.jks -alias CARoot -import -file root.crt
    Enter keystore password: some_password
    Owner: EMAILADDRESS=myemail@mycompany.com, CN=*.mycompany.com, O=MyCompany, L=Cambridge, ST=MA, C=US
    Issuer: EMAILADDRESS=myemail@mycompany.com, CN=*.mycompany.com, O=MyCompany, L=Cambridge, ST=MA, C=US
    Serial number: c3f02e87707d01aa
    Valid from: Fri Mar 22 13:37:37 EDT 2019 until: Sun Apr 21 13:37:37 EDT 2019
    Certificate fingerprints:
             MD5:  73:B1:87:87:7B:FE:F1:6E:94:55:FD:AF:5D:D0:C3:0C
             SHA1: C0:69:1C:93:54:21:87:C7:03:93:FE:39:45:66:DE:22:18:7E:CD:94
             SHA256: 23:03:BB:B7:10:12:50:D9:C5:D0:B7:58:97:41:1E:0F:25:A0:DB:D0:1E:7D:F9:6E:60:8F:79:A6:1C:3F:DD:D5
    Signature algorithm name: SHA256withRSA
    Subject Public Key Algorithm: 2048-bit RSA key
    Version: 3
    
    Extensions:
    
    #1: ObjectId: 2.5.29.35 Criticality=false
    AuthorityKeyIdentifier [
    KeyIdentifier [
    0000: 50 69 11 64 45 E9 CC C5   09 EE 26 B5 3E 71 39 7C  Pi.dE.....&.>q9.
    0010: E5 3D 78 16                                        .=x.
    ]
    ]
    
    #2: ObjectId: 2.5.29.19 Criticality=false
    BasicConstraints:[
      CA:true
      PathLen:2147483647
    ]
    
    #3: ObjectId: 2.5.29.14 Criticality=false
    SubjectKeyIdentifier [
    KeyIdentifier [
    0000: 50 69 11 64 45 E9 CC C5   09 EE 26 B5 3E 71 39 7C  Pi.dE.....&.>q9.
    0010: E5 3D 78 16                                        .=x.
    ]
    ]
    
    Trust this certificate? [no]:  yes
    Certificate was added to keystore
    
  6. 将已签名的 Kafka 代理证书导入到密钥库中。

    $ keytool -keystore kafka01.keystore.jks -alias localhost \
                    -import -file kafka01.signed.crt
    Enter keystore password: some_password
    Owner: CN=Database Admin, OU=MyCompany, O=Unknown, L=Cambridge, ST=MA, C=US
    Issuer: EMAILADDRESS=myemail@mycompany.com, CN=*.mycompany.com, O=MyCompany, L=Cambridge, ST=MA, C=US
    Serial number: b4bba9a1828ecaaf
    Valid from: Tue Mar 26 12:26:34 EDT 2019 until: Wed Mar 25 12:26:34 EDT 2020
    Certificate fingerprints:
                MD5:  17:EA:3E:15:B4:15:E9:93:67:EE:59:C0:4F:D1:4C:01
                SHA1: D5:35:B7:F7:44:7C:D6:B4:56:6F:38:2D:CD:3A:16:44:19:C1:06:B7
                SHA256: 25:8C:46:03:60:A7:4C:10:A8:12:8E:EA:4A:FA:42:1D:A8:C5:FB:65:81:74:CB:46:FD:B1:33:64:F2:A3:46:B0
    Signature algorithm name: SHA256withRSA
    Subject Public Key Algorithm: 2048-bit RSA key
    Version: 1
    Trust this certificate? [no]:  yes
    Certificate was added to keystore
    
  7. 如果未登录到要为其准备密钥库的 Kafka 代理,请使用 scp 将信任库和密钥库复制到该代理中。如果您已决定在代理的文件系统中存储密钥库和信任库文件的位置,可以直接将其复制到最终目标位置。以下示例只是将它们临时复制到 root 用户的主目录。下一步将它们移动到最终位置。

    $ scp kafka.truststore.jks kafka01.keystore.jks root@kafka01.mycompany.com:
    root@kafka01.mycompany.com's password: root_password
    kafka.truststore.jks                              100% 1048     1.0KB/s   00:00
    kafka01.keystore.jks                              100% 3277     3.2KB/s   00:00
    
  8. 对剩余的 Kafka 代理重复步骤 2 到 7。

允许 Kafka 读取密钥库和信任库

如果在上一步中没有将信任库和密钥库复制到 Kafka 可以读取它们的目录,则必须将其复制到代理上的最终位置。此外,还必须允许用于运行 Kafka 的用户帐户读取这些文件。确保该用户能够访问这些文件的最简单方法是授予该用户对这些文件的所有权。

在以下示例中,Kafka 由 Linux 用户 kafka 运行。如果使用其他用户运行 Kafka,请务必对信任库和密钥库文件设置相应的权限。

  1. 以 root 用户身份登录到 Kafka 代理。

  2. 将信任库和密钥库复制到 Kafka 可以访问它们的目录。这些文件没有固定位置:您可以选择 /etc 下的目录,也可以选择通常用于存储配置文件的某个其他位置。以下示例会将它们从 root 用户的主目录复制到名为 /opt/kafka/config/ 的 Kafka 配置目录。在您自己的系统中,此配置目录可能位于不同的位置,具体取决于您安装 Kafka 的方式。

  3. 将信任库和密钥库复制到 Kafka 可以访问它们的目录。这些文件没有固定位置:您可以选择 /etc 下的目录,也可以选择通常用于存储配置文件的某个其他位置。以下示例会将它们从 root 用户的主目录复制到名为 /opt/kafka/config/ 的 Kafka 配置目录。在您自己的系统中,此配置目录可能位于不同的位置,具体取决于您安装 Kafka 的方式。

    ~# cd /opt/kafka/config/
    /opt/kafka/config# cp /root/kafka01.keystore.jks /root/kafka.truststore.jks .
    
  4. 如果您未以运行 Kafka 的用户帐户身份登录,请更改信任库和密钥库文件的所有权。以下示例会将所有权从 root 用户(即当前登录的用户)更改为 Kafka 用户:

    /opt/kafka/config# ls -l
    total 80
    ...
    -rw-r--r-- 1 kafka nogroup 1221 Feb 21  2018 consumer.properties
    -rw------- 1 root  root    3277 Mar 27 08:03 kafka01.keystore.jks
    -rw-r--r-- 1 root  root    1048 Mar 27 08:03 kafka.truststore.jks
    -rw-r--r-- 1 kafka nogroup 4727 Feb 21  2018 log4j.properties
    ...
    /opt/kafka/config# chown kafka kafka01.keystore.jks kafka.truststore.jks
    /opt/kafka/config# ls -l
    total 80
    ...
    -rw-r--r-- 1 kafka nogroup 1221 Feb 21  2018 consumer.properties
    -rw------- 1 kafka root    3277 Mar 27 08:03 kafka01.keystore.jks
    -rw-r--r-- 1 kafka root    1048 Mar 27 08:03 kafka.truststore.jks
    -rw-r--r-- 1 kafka nogroup 4727 Feb 21  2018 log4j.properties
    ...
    
  5. 对剩余的 Kafka 代理重复步骤 1 到 3。

将 Kafka 配置为使用 TLS

信任库和密钥库就位后,下一步是编辑 Kafka 的 server.properties 配置文件,以告知 Kafka 使用 TLS/SSL 加密。此文件通常存储在 Kafka 配置目录中。此目录的位置取决于您安装的 Kafka 方式。在以下示例中,该文件位于 /opt/kafka/config 中。

编辑这些文件时,请务必不要更改其所有权。确保 Linux 不会更改文件所有权的最佳方法是使用 su 成为运行 Kafka 的用户帐户(假设您尚未以该用户身份登录):

$ /opt/kafka/config# su -s /bin/bash kafka

server.properties 文件包含采用 property=value 格式的 Kafka 代理设置。要将 Kafka 代理配置为使用 SSL,请更改或添加以下属性设置:

listeners
Kafka 代理侦听的主机名和端口。如果不使用 SSL 在代理之间建立连接,则必须同时提供 PLANTEXT 和 SSL 选项。例如: listeners=PLAINTEXT://hostname:9092,SSL://hostname:9093
ssl.keystore.location
密钥库文件的绝对路径。
ssl.keystore.password
密钥库文件的密码。
ssl.key.password
密钥库中 Kafka 代理密钥的密码。如果愿意,可以将此密码设为与密钥库密码不同。
ssl.truststore.location
信任库文件的位置。
ssl.truststore.password
用于访问信任库的密码。
ssl.enabled.protocols
Kafka 允许客户端使用的 TLS/SSL 协议。
ssl.client.auth
指定 SSL 身份验证是必需还是可选的。验证客户端身份需要使用此设置的最安全设置。

以下示例会将 Kafka 配置为通过 SSL 身份验证验证客户端身份。它不使用 SSL 与其他代理通信,因此 server.properties 文件会定义 SSL 和 PLAINTEXT 侦听器端口。它不会为侦听器端口提供主机名,以告知 Kafka 侦听默认网络接口。

为此配置添加到 kafka01 代理的 server.properties 副本的行如下所示:

listeners=PLAINTEXT://:9092,SSL://:9093
ssl.keystore.location=/opt/kafka/config/kafka01.keystore.jks
ssl.keystore.password=vertica
ssl.key.password=vertica
ssl.truststore.location=/opt/kafka/config/kafka.truststore.jks
ssl.truststore.password=vertica
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.client.auth=required

必须对所有代理上的 server.properties 文件进行此类更改。

对代理的 server.properties 文件进行更改后,请重新启动 Kafka。如何重新启动 Kafka 取决于安装方式:

  • 如果 Kafka 作为 Hadoop 群集的一部分运行,则通常可以从用于控制 Hadoop 的任何界面(例如 Ambari)中重新启动它。

  • 如果 Kafka 是直接安装的,则可以通过直接运行 kafka-server-stop.shkafka-server-start.sh 脚本或者通过 Linux 系统的服务控制命令(如 systemctl)来重新启动它。必须在每个代理上运行此命令。

测试配置

如果尚未配置客户端身份验证,可以通过运行以下命令快速测试 Kafka 是否能够访问其密钥库:

$ openssl s_client -debug -connect broker_host_name:9093 -tls1

如果 Kafka 能够访问其密钥库,则以下命令将输出代理证书的转储(使用 CTRL+C 退出):

=> # openssl s_client -debug -connect kafka01.mycompany.com:9093 -tls1
CONNECTED(00000003)
write to 0xa4e4f0 [0xa58023] (197 bytes => 197 (0xC5))
0000 - 16 03 01 00 c0 01 00 00-bc 03 01 76 85 ed f0 fe   ...........v....
0010 - 60 60 7e 78 9d d4 a8 f7-e6 aa 5c 80 b9 a7 37 61   ``~x......\...7a
0020 - 8e 04 ac 03 6d 52 86 f5-84 4b 5c 00 00 62 c0 14   ....mR...K\..b..
0030 - c0 0a 00 39 00 38 00 37-00 36 00 88 00 87 00 86   ...9.8.7.6......
0040 - 00 85 c0 0f c0 05 00 35-00 84 c0 13 c0 09 00 33   .......5.......3
0050 - 00 32 00 31 00 30 00 9a-00 99 00 98 00 97 00 45   .2.1.0.........E
0060 - 00 44 00 43 00 42 c0 0e-c0 04 00 2f 00 96 00 41   .D.C.B...../...A
0070 - c0 11 c0 07 c0 0c c0 02-00 05 00 04 c0 12 c0 08   ................
0080 - 00 16 00 13 00 10 00 0d-c0 0d c0 03 00 0a 00 ff   ................
0090 - 01 00 00 31 00 0b 00 04-03 00 01 02 00 0a 00 1c   ...1............
00a0 - 00 1a 00 17 00 19 00 1c-00 1b 00 18 00 1a 00 16   ................
00b0 - 00 0e 00 0d 00 0b 00 0c-00 09 00 0a 00 23 00 00   .............#..
00c0 - 00 0f 00 01 01                                    .....
read from 0xa4e4f0 [0xa53ad3] (5 bytes => 5 (0x5))
0000 - 16 03 01 08 fc                                    .....
             . . .

但是,上述方法不是决定性方法;它只是告知您 Kafka 能否找到其密钥库。

Kafka 能否接受 TLS 连接的最佳测试是配置命令行 Kafka 生产者和使用者。要配置这些工具,必须先创建一个客户端密钥库。这些步骤与创建代理密钥库相同。

  1. 创建客户端密钥库:

    keytool -keystore client.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA -ext SAN=DNS:fqdn_of_client_system
    
  2. 使用您将用于运行生产者和/或使用者的系统的 FQDN 来回答“您的名字和姓氏是什么? (What is your first and last name)?”。使用贵组织的详细信息回答剩余提示问题。

  3. 导出客户端证书以便可以对其进行签名:

    keytool -keystore client.keystore.jks -alias localhost -certreq -file client.unsigned.cert
    
  4. 使用根 CA 对客户端证书进行签名:

    openssl x509 -req -CA root.crt -CAkey root.key -in client.unsigned.cert -out client.signed.cert \
            -days 365 -CAcreateserial
    
  5. 将根 CA 添加到密钥库:

    keytool -keystore client.keystore.jks -alias CARoot -import -file root.crt
    
  6. 将已签名的客户端证书添加到密钥库:

    keytool -keystore client.keystore.jks -alias localhost -import -file client.signed.cert
    
  7. 将密钥库复制到将使用它的位置。例如,可以选择将其复制到为 Kafka 代理复制了密钥库的同一目录中。如果选择将其复制到某个其他位置,或者打算使用某个其他用户来运行命令行客户端,请务必添加您为代理创建的信任库文件的副本。客户端可以重复使用此信任库文件以对 Kafka 代理进行身份验证,因为系统会使用同一 CA 对所有证书进行签名。此外,还要相应地设置该文件的所有权和权限。

接下来,必须创建将命令行客户端配置为使用 TLS 的属性文件(类似于代理的 server.properties 文件)。对于在名为 kafka01 的 Kafka 代理上运行的客户端,您的配置文件可能如下所示:

security.protocol=SSL
ssl.truststore.location=/opt/kafka/config/kafka.truststore.jks
ssl.truststore.password=trustore_password
ssl.keystore.location=/opt/kafka/config/client.keystore.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.client.auth=required

此属性文件假设密钥库文件位于 Kafka 配置目录中。

最后,您可以运行命令行生产者或使用者,以确保他们可以连接和处理数据。您将向这些客户端提供刚创建的属性文件。以下示例假设已将属性文件存储在 Kafka 配置目录中,以及 Kafka 安装在 /opt/kafka 中:

~# cd /opt/kafka


/opt/kafka# bin/kafka-console-producer.sh --broker-list kafka01.mycompany.com:9093  \
                                          --topic test --producer.config config/client.properties
>test
>test again
>More testing. These messages seem to be getting through!
^D
/opt/kafka# bin/kafka-console-consumer.sh --bootstrap-server kafaka01.mycompany.com:9093  --topic test \
                                          --consumer.config config/client.properties --from-beginning
test
test again
More testing. These messages seem to be getting through!
^C
Processed a total of 3 messages

从 Kafka 加载数据

将 Kafka 配置为接受 TLS 连接后,验证是否可以直接将数据从其加载到 Vertica 中。即使计划创建调度程序以自动流式传输数据,也应该执行此步骤。

您可以选择创建单独的密钥和证书,用于直接从 Kafka 加载数据。以下示例会重复使用在此示例的第 2 部分中为 Vertica 服务器创建的密钥和证书。

您可以使用含有 COPY 语句的 KafkaSource 数据源函数直接从 Kafka 加载数据(请参阅手动使用来自 Kafka 的数据)。KafkaSource 函数会创建与 Kafka 的连接,因此它需要密钥、证书和相关密码来创建加密连接。您将通过会话参数传递这些信息。有关这些参数的列表,请参阅 Kafka 用户定义的会话参数

将密钥和证书添加到该参数中的最简单方法是先将其读取到 vsql 变量中。您可以通过 Linux shell 使用左引号读取文件内容来获取它们的内容。然后,设置变量中的会话参数。在设置会话参数之前,增大 MaxSessionUDParameterSize 会话参数以在会话变量中为密钥和证书添加足够的存储空间。它们可以大于会话变量的默认大小限制(1000 字节)。

以下示例将从名为 /home/dbadmin/SSL 的目录中读取服务器密钥和证书以及根 CA。由于服务器的密钥密码未保存在文件中,该示例会在运行 vsql 之前在名为 KVERTICA_PASS 的 Linux 环境变量中设置该密码。该示例会在设置会话变量之前先将 MaxSessionUDParameterSize 设为 100000。最后,它会为 Kafka 连接启用 TLS,并从名为 test 的主题流式传输数据。

$ export KVERTICA_PASS=server_key_password
$ vsql
Password:
Welcome to vsql, the Vertica Analytic Database interactive terminal.

Type:  \h or \? for help with vsql commands
       \g or terminate with semicolon to execute query
       \q to quit

SSL connection (cipher: DHE-RSA-AES256-GCM-SHA384, bits: 256, protocol: TLSv1.2)

=> \set cert '\''`cat /home/dbadmin/SSL/server.crt`'\''
=> \set pkey '\''`cat /home/dbadmin/SSL/server.key`'\''
=> \set ca '\''`cat /home/dbadmin/SSL/root.crt`'\''
=> \set pass '\''`echo $KVERTICA_PASS`'\''
=> alter session set MaxSessionUDParameterSize=100000;
ALTER SESSION
=> ALTER SESSION SET UDPARAMETER kafka_SSL_Certificate=:cert;
ALTER SESSION
=> ALTER SESSION SET UDPARAMETER kafka_SSL_PrivateKey_secret=:pkey;
ALTER SESSION
=> ALTER SESSION SET UDPARAMETER kafka_SSL_PrivateKeyPassword_secret=:pass;
ALTER SESSION
=> ALTER SESSION SET UDPARAMETER kafka_SSL_CA=:ca;
ALTER SESSION
=> ALTER SESSION SET UDPARAMETER kafka_Enable_SSL=1;
ALTER SESSION
=> CREATE TABLE t (a VARCHAR);
CREATE TABLE
=> COPY t SOURCE KafkaSource(brokers='kafka01.mycompany.com:9093',
                             stream='test|0|-2', stop_on_eof=true,
                             duration=interval '5 seconds')
          PARSER KafkaParser();
 Rows Loaded
-------------
           3
(1 row)

=> SELECT * FROM t;
                            a
---------------------------------------------------------
 test again
 More testing. These messages seem to be getting through!
 test

(3 rows)

配置调度程序

配置的最后一部分是将调度程序设置为在与 Kafka(以及 Vertica,可选)通信时使用 SSL。当调度程序运行 COPY 命令从 Kafka 获取数据时,它会使用自己的密钥和证书对 Kafka 进行身份验证。如果选择让调度程序使用 TLS/SSL 连接到 Vertica,则它可以重复使用相同的密钥库和信任库建立此连接。

为调度程序创建信任库和密钥库

由于调度程序是一个单独的组件,因此它必须具有自己的密钥和证书。调度程序在 Java 中运行并使用 JDBC 接口连接到 Vertica。因此,必须为它创建在建立与 Vertica 的 TLS 加密连接时使用的密钥库(当 ssl.client.auth=required 时)和信任库。

请记住,如果 Kafka 服务器将 ssl.client.auth 设为 nonerequested,则创建密钥库是可选的。

此过程类似于为 Kafka 代理创建信任库和密钥库。主要区别在于使用 keytool-dname 选项将密钥的公用名称 (CN) 设为域通配符。使用此设置可使密钥和证书与网络中的任何主机匹配。如果在不同的服务器上运行多个调度程序以提供冗余,则此选项特别有用。无论调度程序在域中的哪个服务器上运行,它们都可以使用相同的密钥和证书。

  1. 为调度程序创建信任库文件。添加已用于对 Kafka 群集和 Vertica 群集的密钥库进行签名的 CA 证书。如果使用多个 CA 对证书进行签名,请添加已使用的所有 CA。

    $ keytool -keystore scheduler.truststore.jks -alias CARoot -import \
                   -file root.crt
    Enter keystore password: some_password
    Re-enter new password: some_password
    Owner: EMAILADDRESS=myemail@mycompany.com, CN=*.mycompany.com, O=MyCompany, L=Cambridge, ST=MA, C=US
    Issuer: EMAILADDRESS=myemail@mycompany.com, CN=*.mycompany.com, O=MyCompany, L=Cambridge, ST=MA, C=US
    Serial number: c3f02e87707d01aa
    Valid from: Fri Mar 22 13:37:37 EDT 2019 until: Sun Apr 21 13:37:37 EDT 2019
    Certificate fingerprints:
             MD5:  73:B1:87:87:7B:FE:F1:6E:94:55:FD:AF:5D:D0:C3:0C
             SHA1: C0:69:1C:93:54:21:87:C7:03:93:FE:39:45:66:DE:22:18:7E:CD:94
             SHA256: 23:03:BB:B7:10:12:50:D9:C5:D0:B7:58:97:41:1E:0F:25:A0:DB:
                     D0:1E:7D:F9:6E:60:8F:79:A6:1C:3F:DD:D5
    Signature algorithm name: SHA256withRSA
    Subject Public Key Algorithm: 2048-bit RSA key
    Version: 3
    
    Extensions:
    
    #1: ObjectId: 2.5.29.35 Criticality=false
    AuthorityKeyIdentifier [
    KeyIdentifier [
    0000: 50 69 11 64 45 E9 CC C5   09 EE 26 B5 3E 71 39 7C  Pi.dE.....&.>q9.
    0010: E5 3D 78 16                                        .=x.
    ]
    ]
    
    #2: ObjectId: 2.5.29.19 Criticality=false
    BasicConstraints:[
      CA:true
      PathLen:2147483647
    ]
    
    #3: ObjectId: 2.5.29.14 Criticality=false
    SubjectKeyIdentifier [
    KeyIdentifier [
    0000: 50 69 11 64 45 E9 CC C5   09 EE 26 B5 3E 71 39 7C  Pi.dE.....&.>q9.
    0010: E5 3D 78 16                                        .=x.
    ]
    ]
    
    Trust this certificate? [no]:  yes
    Certificate was added to keystore
    
  2. 初始化密钥库,从而向其传递作为公用名称的通配符主机名。此命令中的 alias 参数非常重要,因为稍后您会使用它来标识调度程序在创建 SSL 连接时必须使用的密钥:

    keytool -keystore scheduler.keystore.jks -alias vsched -validity 365 -genkey \
            -keyalg RSA  -dname CN=*.mycompany.com
    
  3. 导出调度程序的密钥,以便可以使用根 CA 对其进行签名:

    $ keytool -keystore scheduler.keystore.jks -alias vsched -certreq \
            -file scheduler.unsigned.cert
    
  4. 使用根 CA 对调度程序密钥进行签名:

    $ openssl x509 -req -CA root.crt -CAkey root.key -in scheduler.unsigned.cert \
            -out scheduler.signed.cert -days 365 -CAcreateserial
    
  5. 将调度程序密钥重新导入到密钥库中:

    $ keytool -keystore scheduler.keystore.jks -alias localhost -import -file scheduler.signed.cert
    

设置环境变量 VKCONFIG_JVM_OPTS

您必须将多个设置传递给运行调度程序的 Java 虚拟机 (JVM) 的 JDBC 接口。这些设置会告知 JDBC 驱动程序在哪里找到密钥库和信任库,以及密钥的密码。传递这些设置的最简单方法是设置名为 VKCONFIG_JVM_OPTS 的 Linux 环境变量。启动时,调度程序会检查此环境变量并将其中定义的任何属性传递给 JVM。

您需要设置的属性如下:

  • javax.net.ssl.keystore:要使用的密钥库文件的绝对路径。

  • javax.net.ssl.keyStorePassword:调度程序的密钥密码。

  • javax.net.ssl.trustStore:信任库文件的绝对路径。

用于设置环境变量的 Linux 命令行为:

export VKCONFIG_JVM_OPTS="$VKCONFIG_JVM_OPTS -Djavax.net.ssl.trustStore=/path/to/truststore \
                          -Djavax.net.ssl.keyStore=/path/to/keystore \
                          -Djavax.net.ssl.keyStorePassword=keystore_password"

例如,假设调度程序的信任库和密钥库位于目录 /home/dbadmin/SSL 中。那么您可以使用以下命令设置 VKCONFIG_JVM_OPTS 变量:

$ export VKCONFIG_JVM_OPTS="$VKCONFIG_JVM_OPTS \
                           -Djavax.net.ssl.trustStore=/home/dbadmin/SSL/scheduler.truststore.jks \
                           -Djavax.net.ssl.keyStore=/home/dbadmin/SSL/scheduler.keystore.jks \
                           -Djavax.net.ssl.keyStorePassword=key_password"

要确保始终设置此变量,请将该命令添加到运行调度程序的用户帐户的 ~/.bashrc 或其他启动文件中。

如果需要在 JDBC 连接到 Vertica 时使用 TLS,请将 TLSmode=require 添加到调度程序使用的 JDBC URL 中。添加此参数的最简单方法是使用调度程序的 --jdbc-url 选项。假设为调度程序使用了配置文件,则可以将以下行添加到配置文件中:

--jdbc-url=jdbc:vertica://VerticaHost:portNumber/databaseName?user=username&password=password&TLSmode=require

有关将 JDBC 与 Vertica 结合使用的详细信息,请参阅 Java

在调度程序配置中启用 TLS

最后,启用 TLS。每次运行 vkconfig 时,都必须向它传递以下选项:

--enable-ssl
true,允许调度程序在连接到 Kafka 时使用 SSL。
--ssl-ca-alias
用于对 Kafka 代理的密钥进行签名的 CA 的别名。此别名必须与您提供给 keytool 命令的 -alias 实参值相匹配才能将 CA 导入到信任库中。
--ssl-key-alias
已分配给调度密钥的别名。此值必须与您在创建调度程序的密钥库时提供给 keytool 命令的 -alias 值相匹配。
--ssl-key-password
调度程序密钥的密码。

有关这些选项的详细信息,请参阅常用 vkconfig 脚本选项。出于便利性和安全考虑,请将这些选项添加到传递给 vkconfig 的配置文件中。否则,您将面临通过进程列表暴露密钥密码的风险,因为同一系统上的其他用户可以查看该进程列表。有关设置配置文件的详细信息,请参阅配置文件格式

将以下内容添加到调度程序配置文件中,以允许它在连接到 Vertica 时使用密钥库和信任库并启用 TLS:

enable-ssl=true
ssl-ca-alias=CAroot
ssl-key-alias=vsched
ssl-key-password=vertica
jdbc-url=jdbc:vertica://VerticaHost:portNumber/databaseName?user=username&password=password&TLSmode=require

启动调度程序

将调度程序配置为使用 SSL 后,启动调度程序并验证它是否可以加载数据。例如,要使用名为 weblog.conf 的配置文件启动调度程序,请使用以下命令:

$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &

6.5 - 对 Kafka TLS/SSL 连接问题进行故障排除

将 Vertica、Kafka 和调度程序配置为使用 TLS/SSL 身份验证和加密后,可能会遇到数据流式传输问题。此部分介绍您可能会遇到的一些常见错误,以及如何对这些错误进行故障排除。

启动调度程序时出错

启动调度程序时,可能会显示如下错误:

$ vkconfig launch --conf weblog.conf
java.sql.SQLNonTransientException: com.vertica.solutions.kafka.exception.ConfigurationException:
       No keystore system property found: null
    at com.vertica.solutions.kafka.util.SQLUtilities.getConnection(SQLUtilities.java:181)
    at com.vertica.solutions.kafka.cli.CLIUtil.assertDBConnectionWorks(CLIUtil.java:40)
    at com.vertica.solutions.kafka.Launcher.run(Launcher.java:135)
    at com.vertica.solutions.kafka.Launcher.main(Launcher.java:263)
Caused by: com.vertica.solutions.kafka.exception.ConfigurationException: No keystore system property found: null
    at com.vertica.solutions.kafka.security.KeyStoreUtil.loadStore(KeyStoreUtil.java:77)
    at com.vertica.solutions.kafka.security.KeyStoreUtil.<init>(KeyStoreUtil.java:42)
    at com.vertica.solutions.kafka.util.SQLUtilities.getConnection(SQLUtilities.java:179)
    ... 3 more

当调度程序找不到或无法读取密钥库或信任库文件时,就会引发这些错误。要解决此问题,请执行以下操作:

  • 验证是否已设置 VKCONFIG_JVM_OPTS Linux 环境变量。如果没有此变量,调度程序将不知道在何处找到创建 TLS/SSL 连接时要使用的信任库和密钥库。请参阅步骤 2:设置 VKCONFIG_JVM_OPTS 环境变量以了解详细信息。

  • 验证密钥库和信任库文件是否位于在 VKCONFIG_JVM_OPTS 环境变量中设置的路径中。

  • 验证运行调度程序的用户帐户是否对信任库和密钥库文件具有读取权限。

  • 验证在调度程序配置中提供的密钥密码是否正确。请注意,必须提供密钥的密码,而非密钥库。

另一个可能的错误消息是无法设置 TLS 密钥库:

Exception in thread "main" java.sql.SQLRecoverableException: [Vertica][VJDBC](100024) IOException while communicating with server: java.io.IOException: Failed to create an SSLSocketFactory when setting up TLS: keystore not found.
        at com.vertica.io.ProtocolStream.logAndConvertToNetworkException(Unknown Source)
        at com.vertica.io.ProtocolStream.enableSSL(Unknown Source)
        at com.vertica.io.ProtocolStream.initSession(Unknown Source)
        at com.vertica.core.VConnection.tryConnect(Unknown Source)
        at com.vertica.core.VConnection.connect(Unknown Source)
        . . .

使用非 JKS 格式的密钥库或信任库文件以及未提供正确的文件扩展名可能会导致此错误。如果调度程序无法识别密钥库或信任库文件名的文件扩展名,它会假设该文件采用 JKS 格式。如果该文件不是采用此格式,调度程序将退出并显示如上所示的错误消息。要更正此错误,请重命名密钥库和信任库文件,以使用正确的文件扩展名。例如,如果文件采用 PKCS 12 文件格式,请将其文件扩展名更改为 .p12.pks

数据不加载

如果您发现调度程序未将数据加载到数据库,应先查询 stream_microbatch_history 表以确定调度程序是否正在执行微批处理。如果是,请记录其结果。错误的 TLS/SSL 配置通常会导致出现 NETWORK_ISSUE 状态:

=> SELECT frame_start, end_reason, end_reason_message FROM weblog_sched.stream_microbatch_history;
       frame_start       |  end_reason   | end_reason_message
-------------------------+---------------+--------------------
 2019-04-05 11:35:18.365 | NETWORK_ISSUE |
 2019-04-05 11:35:38.462 | NETWORK_ISSUE |

如果您怀疑是 SSL 问题,可通过查看 Kafka 的 server.log 文件来验证 Vertica 是否正在建立与 Kafka 的连接。失败的 SSL 连接尝试会显示在此日志中,如以下示例所示:

java.io.IOException: Unexpected status returned by SSLEngine.wrap, expected
        CLOSED, received OK. Will not send close message to peer.
        at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:172)
        at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:703)
        at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:61)
        at org.apache.kafka.common.network.Selector.doClose(Selector.java:739)
        at org.apache.kafka.common.network.Selector.close(Selector.java:727)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:520)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
        at kafka.network.Processor.poll(SocketServer.scala:551)
        at kafka.network.Processor.run(SocketServer.scala:468)
        at java.lang.Thread.run(Thread.java:748)

如果未看到此类错误,则可能是 Kafka 与 Vertica 之间存在网络问题。如果确实看到这些错误,请考虑执行以下调试步骤:

  • 验证 Kafka 群集的配置是否一致。例如,如果将某些 Kafka 节点设置为需要进行客户端身份验证,而其他节点不需要,则可能会看到连接错误。

  • 验证证书和密钥中的公用名称 (CN) 是否与系统的主机名相匹配。

  • 验证 Kafka 群集是否接受使用在 server.properties 文件的侦听器属性中指定的端口和主机名建立的连接。例如,假设在此设置中使用 IP 地址,但在调度程序的配置中定义群集时使用主机名。那么,Kafka 可能会拒绝 Vertica 进行的连接尝试,或者 Vertica 可能会拒绝 Kafka 节点的身份。

  • 如果在 Kafka 中使用客户端身份验证,请尝试关闭它以查看调度程序是否可以连接。如果禁用身份验证后,调度程序可以流式传输数据,则可以证明该问题与客户端身份验证无关。在这种情况下,请查看 Kafka 群集和调度程序的证书与 CA。确保信任库包括用于对密钥进行签名的所有 CA,直到并包括根 CA。

Avro 架构注册表和 KafkaAvroParser

要在 Avro 架构注册表与 Vertica 之间创建 TLS 连接,KafkaAvroParser 至少需要以下参数:

  • schema_registry_url (对于 https 方案)

  • schema_registry_ssl_ca_path

当且仅当 TLS 访问失败时,请根据以下对象确定 Vertica 需要哪些 TLS 架构注册表信息:

  • 证书颁发机构 (CA)

  • TLS 服务器证书

  • TLS 密钥

仅使用 KafkaAvroParser 参数提供必需的 TLS 架构注册表信息。必须能够在处理 Avro 数据的每个 Vertica 节点的文件系统中访问 TLS 信息。

以下示例显示如何将这些参数传递给 KafkaAvroParser:

KafkaAvroParser(
    schema_registry_url='https://localhost:8081'
    schema_registry_ssl_ca_path='path/to/certificate-authority',
    schema_registry_ssl_cert_path='path/to/tls-server-certificate',
    schema_registry_ssl_key_path='path/to/private-tls-key',
    schema_registry_ssl_key_password_path='path/to/private-tls-key-password'
)

7 - 使用 SASL 向 Kafka 进行身份验证

Kafka 支持使用简单身份验证和安全层 (SASL) 来对生产者和使用者进行身份验证。在使用大多数 Kafka 相关函数(如 KafkaSource)时,都可以使用 SASL 向 Kafka 进行 Vertica 身份验证。

Vertica 支持通过以下身份验证机制使用 SASL_PLAINTEXT 和 SASL_SSL 协议:

  • PLAIN

  • SCRAM-SHA-256

  • SCRAM-SHA-512

必须将 Kafka 群集配置为启用 SASL 身份验证。请参阅 Kafka 版本对应的 Kafka 文档以了解如何配置 SASL 身份验证。

要在 Vertica 和 Kafka 之间使用 SASL 身份验证,请直接在 rdkafka 库中使用 kafka_conf 参数设置 SASL 相关配置选项。Vertica 使用此库来连接 Kafka。有关在 rdkafka 库中直接设置配置选项的详细信息,请参阅直接设置 Kafka 库选项

相关配置选项有:

  • security.protocol 设置用于向 Kafka 进行身份验证的安全协议。

  • sasl.mechanism 设置安全机制。

  • sasl.username 设置要用于进行身份验证的 SASL 用户。

  • sasl.password 设置要用于进行 SASL 身份验证的密码。

有关所有 SASL 相关设置的列表,请参阅 rdkafka 配置文档

以下示例演示了如何使用 SASL_PLAINTEXT 安全协议调用 KafkaCheckBrokers

=> SELECT KafkaCheckBrokers(USING PARAMETERS
      brokers='kafka01.example.com:9092',
      kafka_conf='{"sasl.username":"dbadmin", "sasl.password":"pword", "sasl.mechanism":"PLAIN", "security.protocol":"SASL_PLAINTEXT"}'
   ) OVER ();

此示例演示了在通过 SSL 连接从 Kafka 复制数据时如何使用 SASL 身份验证。此示例假定 Vertica 和 Kafka 已经配置为使用 TLS/SSL 加密,如使用 Kafka 进行 TLS/SSL 加密所述:

=> COPY mytopic_table
      SOURCE KafkaSource(
        stream='mytopic|0|-2',
        brokers='kafka01.example.com:9092',
        stop_on_eof=true,
        kafka_conf='{"sasl.username":"dbadmin", "sasl.password":"pword", "sasl.mechanism":"PLAIN", "security.protocol":"SASL_SSL"}'
      )
      FILTER KafkaInsertDelimiters(delimiter = E'\n')
      DELIMITER ','
      ENCLOSED BY '"';

有关在 rfkafka 库中使用 SASL 的详细信息,请参阅 rdkafka github 站点上的将 SASL 与 librdkafka 结合使用

8 - 排除 Kafka 集成问题

以下主题可帮助您排除 Vertica 与 Apache Kafka 的集成问题。

8.1 - 使用 kafkacat 对 Kafka 集成问题进行故障排除

Kafkacat 是第三方开源实用程序,可用于从 Linux 命令行连接到 Kafka。它使用 Vertica 与 Apache Kafka 的集成用于连接到 Kafka 的相同底层库。这一共享库使得 kafkcat 成为一款测试和调试 Vertica 与 Kafka 的集成的有用工具。

在以下方面,您可能会发现 kafkacat 非常有用:

  • 测试 Vertica 与 Kafka 群集之间的连接。

  • 检查 Kafka 数据是否存在可能会阻止其中某些数据加载到 Vertica 中的异常。

  • 生成数据以供测试加载到 Vertica 中。

  • 列出有关 Kafka 主题的详细信息。

有关 kafkacat 的详细信息,请参阅其在 Github 上的项目页面

在 Vertica 节点上运行 kafkacat

Kafkacat 实用程序已捆绑到 Vertica 安装包中,因此可以在 Vertica 群集的所有节点的 /opt/vertica/packages/kafka/bin 目录中找到它。这是包含 vkconfig 实用程序的同一目录,因此如果已将它添加到路径中,则无需指定其完整路径即可使用 kafkacat 实用程序。否则,可以使用以下命令将此路径添加到 shell 的环境变量中:

set PATH=/opt/vertica/packages/kafka/bin:$PATH

执行不含任何实参的 kafkacat 会为您显示基本的帮助消息:

$ kafkacat
Error: -b <broker,..> missing

Usage: kafkacat <options> [file1 file2 .. | topic1 topic2 ..]]
kafkacat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kafkacat
Copyright (c) 2014-2015, Magnus Edenhill
Version releases/VER_8_1_RELEASE_BUILD_1_555_20170615-4931-g3fb918 (librdkafka releases/VER_8_1_RELEASE_BUILD_1_555_20170615-4931-g3fb918)


General options:
  -C | -P | -L       Mode: Consume, Produce or metadata List
  -G <group-id>      Mode: High-level KafkaConsumer (Kafka 0.9 balanced consumer groups)
                     Expects a list of topics to subscribe to
  -t <topic>         Topic to consume from, produce to, or list
  -p <partition>     Partition
  -b <brokers,..>    Bootstrap broker(s) (host[:port])
  -D <delim>         Message delimiter character:
                     a-z.. | \r | \n | \t | \xNN
                     Default: \n
  -K <delim>         Key delimiter (same format as -D)
  -c <cnt>           Limit message count
  -X list            List available librdkafka configuration properties
  -X prop=val        Set librdkafka configuration property.
                     Properties prefixed with "topic." are
                     applied as topic properties.
  -X dump            Dump configuration and exit.
  -d <dbg1,...>      Enable librdkafka debugging:
                     all,generic,broker,topic,metadata,queue,msg,protocol,cgrp,security,fetch,feature
  -q                 Be quiet (verbosity set to 0)
  -v                 Increase verbosity
  -V                 Print version

Producer options:
  -z snappy|gzip     Message compression. Default: none
  -p -1              Use random partitioner
  -D <delim>         Delimiter to split input into messages
  -K <delim>         Delimiter to split input key and message
  -l                 Send messages from a file separated by
                     delimiter, as with stdin.
                     (only one file allowed)
  -T                 Output sent messages to stdout, acting like tee.
  -c <cnt>           Exit after producing this number of messages
  -Z                 Send empty messages as NULL messages
  file1 file2..      Read messages from files.
                     With -l, only one file permitted.
                     Otherwise, the entire file contents will
                     be sent as one single message.

Consumer options:
  -o <offset>        Offset to start consuming from:
                     beginning | end | stored |
                     <value>  (absolute offset) |
                     -<value> (relative offset from end)
  -e                 Exit successfully when last message received
  -f <fmt..>         Output formatting string, see below.
                     Takes precedence over -D and -K.
  -D <delim>         Delimiter to separate messages on output
  -K <delim>         Print message keys prefixing the message
                     with specified delimiter.
  -O                 Print message offset using -K delimiter
  -c <cnt>           Exit after consuming this number of messages
  -Z                 Print NULL messages and keys as "NULL"(instead of empty)
  -u                 Unbuffered output

Metadata options:
  -t <topic>         Topic to query (optional)


Format string tokens:
  %s                 Message payload
  %S                 Message payload length (or -1 for NULL)
  %R                 Message payload length (or -1 for NULL) serialized
                     as a binary big endian 32-bit signed integer
  %k                 Message key
  %K                 Message key length (or -1 for NULL)
  %t                 Topic
  %p                 Partition
  %o                 Message offset
  \n \r \t           Newlines, tab
  \xXX \xNNN         Any ASCII character
 Example:
  -f 'Topic %t [%p] at offset %o: key %k: %s\n'


Consumer mode (writes messages to stdout):
  kafkacat -b <broker> -t <topic> -p <partition>
 or:
  kafkacat -C -b ...

High-level KafkaConsumer mode:
  kafkacat -b <broker> -G <group-id> topic1 top2 ^aregex\d+

Producer mode (reads messages from stdin):
  ... | kafkacat -b <broker> -t <topic> -p <partition>
 or:
  kafkacat -P -b ...

Metadata listing:
  kafkacat -L -b <broker> [-t <topic>]

测试与 Kafka 群集的连接并获取元数据

验证 Vertica 节点是否可以连接到 Kafka 群集是经常需要执行的一项基本故障排除步骤。成功执行几乎任何 kafkacat 命令证明您登录的 Vertica 节点能够访问 Kafka 群集。验证连接时,可以执行的一个简单命令是获取 Kafka 群集已定义的所有主题的元数据。以下示例演示了如何使用 kafkacat 的元数据列出命令连接到在端口 6667(由 Hortonworks Hadoop 群集使用的 Kafka 代理端口)上运行的名为 kafka01 的代理。

$ kafkacat -L -b kafka01:6667
Metadata for all topics (from broker -1: kafka01:6667/bootstrap):
 2 brokers:
  broker 1001 at kafka03.example.com:6667
  broker 1002 at kafka01.example.com:6667
 4 topics:
  topic "iot-data" with 3 partitions:
    partition 2, leader 1002, replicas: 1002, isrs: 1002
    partition 1, leader 1001, replicas: 1001, isrs: 1001
    partition 0, leader 1002, replicas: 1002, isrs: 1002
  topic "__consumer_offsets" with 50 partitions:
    partition 23, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 41, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 32, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 8, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 17, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 44, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 35, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 26, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 11, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 29, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 38, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 47, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 20, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 2, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 5, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 14, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 46, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 49, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 40, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 4, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 13, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 22, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 31, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 16, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 7, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 43, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 25, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 34, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 10, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 37, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 1, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 19, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 28, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 45, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 36, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 27, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 9, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 18, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 21, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 48, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 12, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 3, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 30, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 39, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 15, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 42, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 24, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 33, leader 1001, replicas: 1002,1001, isrs: 1001,1002
    partition 6, leader 1002, replicas: 1002,1001, isrs: 1001,1002
    partition 0, leader 1002, replicas: 1002,1001, isrs: 1001,1002
  topic "web_hits" with 1 partitions:
    partition 0, leader 1001, replicas: 1001, isrs: 1001
  topic "ambari_kafka_service_check" with 1 partitions:
    partition 0, leader 1002, replicas: 1002, isrs: 1002

此外,还可以使用此输出来验证由 Kafka 群集定义的主题,以及每个主题定义的分区数。在 Kafka 与 Vertica 之间复制数据时需要提供此信息。

从 Kafka 主题中检索消息

在 Vertica 中对从 Kafka 流式传输消息相关的问题进行故障排除时,您通常希望查看 Kafka 已发送的原始数据。例如,您可能希望验证消息是否采用预期格式。或者,您可能希望查看特定消息,以确认其中一些消息是否未采用 Vertica 能够解析的正确格式。您可以使用 kafkacat 通过其使用命令 (-C) 从主题中读取消息。至少必须向 kafkacat 传递理(-b 实参)以及要从其读取消息的主题 (-t)。此外,还可以选择从特定偏移量 (-o) 和分区 (-p) 读取消息。您通常还希望 kafkacat 在完成数据读取后退出 (-e),而不是继续等待读取更多消息。

以下示例将获取名为 web_hits 的主题中的最后一条消息。offset 实参将使用负值,从而告知 kafkacat 从主题的结尾处读取消息。

$ kafkacat -C -b kafka01:6667 -t web_hits -o -1 -e
{"url": "wp-content/list/search.php", "ip": "132.74.240.52",
"date": "2018/03/28 14:12:34",
"user_agent": "Mozilla/5.0 (iPod; U; CPU iPhone OS 4_2 like
Mac OS X; sl-SI) AppleWebKit/532.22.4 (KHTML, like Gecko) Version/3.0.5
Mobile/8B117 Safari/6532.22.4"}
% Reached end of topic web_hits [0] at offset 54932: exiting

还可以通过指定偏移量和限值(-c 实参)来读取特定范围的消息。例如,您可能希望查看特定范围的数据以确定 Vertica 无法加载数据的原因。以下示例将从偏移量 3280 处开始读取主题 iot-data 中的 10 条消息:

$ kafkacat -C -b kafka01:6667 -t iot-data -o 3280 -c 10 -e
63680, 19, 24.439323, 26.0128725
43510, 71, 162.319805, -37.4924025
91113, 53, 139.764857, -74.735731
88508, 14, -85.821967, -7.236280
20419, 31, -129.583988, 13.995481
79365, 79, 153.184594, 51.1583485
26283, 25, -168.911020, 35.331027
32111, 56, -157.930451, 82.2676385
56808, 17, 19.603286, -0.7698495
9118, 73, 97.365445, -74.8593245

为 Kafka 主题生成数据

如果准备从尚未处于活动状态的 Kafka 主题流式传输数据,可能需要一种流式传输测试消息的方法。然后,可以验证该主题的消息是否已加载到 Vertica 中,而不必担心会丢失实际数据。

要将数据发送到 Kafka,请使用 kafkacat 的生产命令 (-P)。向它提供消息的最简单方法是通过 STDIN 使用管道传输消息,每行一条消息。您可以为数据选择一个特定的分区,或者通过将分区编号设置为 -1 让 kafkacat 将每条消息随机分配到一个随机分区。例如,假设您有一个名为 iot-data.csv 的文件,并且希望将其生成到名为 iot-data 的 Kafka 主题的随机分区中。那么,您可以使用以下命令:

$ cat iot_data.csv | kafkacat -P -p -1 -b kafka01:6667 -t iot-data

8.2 - 排除加载速度缓慢问题

下面列出了一些可能导致消息从 Kafka 加载缓慢的潜在问题。

验证在与 Kafka 0.9 或更早版本通信时是否已禁用 API 版本检查

如果 Kafka 群集运行的是 0.9 或更早版本,请确保已禁用 rdkafka 库的 api.version.request 选项。否则,与 Kafka 的每个 Vertica 连接都会暂停 10 秒,直到 API 版本请求超时。根据加载的时间范围大小或其他超时设置,此延迟可能会减少数据加载的吞吐量。它甚至可能完全阻止消息加载。有关详细信息,请参阅为 Apache Kafka 0.9 及更早版本配置 Vertica

更新在 Vertica 版本 9.1.1 之前创建的调度程序的 message-max-bytes 设置

如果在升级到 Vertica 9.1.1 或更高版本后发现调度程序的数据加载速度缓慢,请考虑更新其加载规范的 message-max-bytes 参数。该设置的含义在 Kafka 版本 0.11 中已发生更改。有关详细信息,请参阅在 Kafka 0.11 及更高版本中对 message.max.bytes 设置的更改

Eon 模式和云延迟

Eon 模式将 Vertica 群集中的计算与存储分开,这可能会在 Vertica 加载和保存数据时导致少量延迟。云计算基础设施也可能导致延迟。这种延迟会侵占调度程序的时间范围持续时间,导致它们在每个时间范围中加载更少的数据。因此,在 Eon 模式数据库中从 Kafka 加载数据时,应该考虑增加时间范围持续时间。有关详细信息,请参阅Vertica Eon 模式和 Kafka

8.3 - 排除消息丢失问题

发出大量数据的 Kafka 生产者可能会使 Vertica 不堪重负,可能导致消息在调度程序将消息加载到 Vertica 之前便在 Kafka 中过期。在 Vertica 对加载的消息执行额外处理(例如文本索引)时,这种情况更为常见。

如果您发现缺少的消息来自具有多个分区的主题,请考虑配置 --max-parallelism微批处理实用程序选项--max-parallelism 选项将一个微批处理拆分为多个子集微批处理。这使您可以使用调度程序资源池中提供的 PLANNEDCONCURRENCY 来创建更多调度程序线程,以同时加载单个微批处理。每个节点都使用资源池 EXECUTIONPARALLELISM 设置来确定为处理分区而创建的线程数。因为 EXECUTIONPARALLELISM 线程按调度程序线程创建,所以为每个微批处理使用更多的 PLANNEDCONCURRENCY 可让您为单个工作单元并行处理更多分区。

有关详细信息,请参阅管理调度程序资源和性能

9 - vkconfig 脚本选项

Vertica 包含可用于配置调度程序的 vkconfig 脚本。此脚本包含多个可用于设置调度程序中的多组选项的工具,以及启动和关闭调度程序的说明。在调用 vkconfig 脚本时,您可以提供想要使用的工具作为第一个实参。

此部分中的主题介绍 vkconfig 脚本中提供的每个工具及其选项。您可以将常用 vkconfig 脚本选项主题中的选项与任何实用程序配合使用。各实用程序的表中列出了该实用程序专用的选项。

9.1 - 常用 vkconfig 脚本选项

可以在 vkconfig 脚本中提供的不同工具中找到这些选项。

--conffilename
包含 vkconfig 脚本配置选项的文本文件。请参阅下面的配置文件格式
--config-schemaschema_name
调度程序的 Vertica 架构的名称。此值与调度程序的名称相同。可以在配置期间使用此名称标识调度程序。

默认值:

stream_config

--dbhosthost name
充当调度程序启动程序节点的 Vertica 节点的主机名或 IP 地址。

默认值:

localhost

--dbport port_number
用于连接到 Vertica 数据库的端口。

默认值:

5433

--enable-ssl
支持 vkconfig 脚本使用 SSL 连接到 Vertica 或在 Vertica 与 Kafka 之间使用 SSL。有关详细信息,请参阅为调度程序配置 TLS 连接
--help
列显帮助菜单,其中会列出可用选项及其描述。
--jdbc-optoption=value [&option2=value2...]
一个或多个要添加到 vkconfig 用于连接到 Vertica 的标准 JDBC URL 的选项。不能与 --jdbc-url 配合使用。
--jdbc-urlurl
vkconfig 用于连接到 Vertica 的完整 JDBC URL,而不是标准 JDBC URL 字符串。
--passwordpassword
数据库用户的密码。
--ssl-ca-aliasalias_name
信任库中根证书颁发机构的别名。设置后,调度程序将仅加载与指定别名关联的证书。忽略后,调度程序会将所有证书都加载到信任库中。
--ssl-key-aliasalias_name
密钥库中密钥和证书对的别名。Vertica 使用 SSL 连接到 Kafka 时必须设置该别名。
--ssl-key-passwordpassword
SSL 密钥的密码。Vertica 使用 SSL 连接到 Kafka 时必须设置该别名。
--usernameusername
用于修改调度程序配置的 Vertica 数据库用户。此用户必须对调度程序的架构具有创建权限。

默认值:

当前用户

--version
显示调度程序的版本号。

配置文件格式

您可以使用配置文件来存储在调用 vkconfig 实用程序时使用的常用参数。配置文件是文本文件,每行包含一个选项设置,格式如下:

option=value

此外,还可以在选项文件中添加注释,方法是在注释前加上井号 (#) 前缀。

#config.properties:
username=myuser
password=mypassword
dbhost=localhost
dbport=5433

可以使用 --conf 选项告知 vkconfig 使用配置文件:

$ /opt/vertica/packages/kafka/bin/vkconfig source --update --conf config.properties

您可以从命令行覆盖所有已存储的参数:

$ /opt/vertica/packages/kafka/bin/vkconfig source --update --conf config.properties --dbhost otherVerticaHost

示例

以下示例显示如何使用共享的实用程序选项。

显示调度程序实用程序的帮助:

$ vkconfig scheduler --help
This command configures a Scheduler, which can run and load data from configured
sources and clusters into Vertica tables. It provides options for changing the
'frame duration' (time given per set of batches to resolve), as well as the
dedicated Vertica resource pool the Scheduler will use while running.

Available Options:
PARAMETER               #ARGS    DESCRIPTION
conf                    1        Allow the use of a properties file to associate
                                 parameter keys and values. This file enables
                                 command string reuse and cleaner command strings.
help                    0        Outputs a help context for the given subutility.
version                 0        Outputs the current Version of the scheduer.
skip-validation         0        [Depricated] Use --validation-type.
validation-type         1        Determine what happens when there are
                                 configuration errors. Accepts: ERROR - errors
                                 out, WARN - prints out a message and continues,
                                 SKIP - skip running validations
dbhost                  1        The Vertica database hostname that contains
                                 metadata and configuration information. The
                                 default value is 'localhost'.
dbport                  1        The port at the hostname to connect to the
                                 Vertica database. The default value is '5433'.
username                1        The user to connect to Vertica. The default
                                 value is the current system user.
password                1        The password for the user connecting to Vertica.
                                 The default value is empty.
jdbc-url                1        A JDBC URL that can override Vertica connection
                                 parameters and provide additional JDBC options.
jdbc-opt                1        Options to add to the JDBC URL used to connect
                                 to Vertica ('&'-separated key=value list).
                                 Used with generated URL (i.e. not with
                                 '--jdbc-url' set).
enable-ssl              1        Enable SSL between JDBC and Vertica and/or
                                 Vertica and Kafka.
ssl-ca-alias            1        The alias of the root CA within the provided
                                 truststore used when connecting between
                                 Vertica and Kafka.
ssl-key-alias           1        The alias of the key and certificate pair
                                 within the provided keystore used when
                                 connecting between Vertica and Kafka.
ssl-key-password        1        The password for the key used when connecting
                                 between Vertica and Kafka. Should be hidden
                                 with file access (see --conf).
config-schema           1        The schema containing the configuration details
                                 to be used, created or edited. This parameter
                                 defines the scheduler. The default value is
                                 'stream_config'.
create                  0        Create a new instance of the supplied type.
read                    0        Read an instance of the supplied type.
update                  0        Update an instance of the supplied type.
delete                  0        Delete an instance of the supplied type.
drop                    0        Drops the specified configuration schema.
                                 CAUTION: this command will completely delete
                                 and remove all configuration and monitoring
                                 data for the specified scheduler.
dump                    0        Dump the config schema query string used to
                                 answer this command in the output.
operator                1        Specifies a user designated as an operator for
                                 the created configuration. Used with --create.
add-operator            1        Add a user designated as an operator for the
                                 specified configuration. Used with --update.
remove-operator         1        Removes a user designated as an operator for
                                 the specified configuration. Used with
                                 --update.
upgrade                 0        Upgrade the current scheduler configuration
                                 schema to the current version of this
                                 scheduler. WARNING: if upgrading between
                                 EXCAVATOR and FRONTLOADER be aware that the
                                 Scheduler is not backwards compatible. The
                                 upgrade procedure will translate your kafka
                                 model into the new stream model.
upgrade-to-schema       1        Used with upgrade: will upgrade the
                                 configuration to a new given schema instead of
                                 upgrading within the same schema.
fix-config              0        Attempts to fix the configuration (ex: dropped
                                 tables) before doing any other updates. Used
                                 with --update.
frame-duration          1        The duration of the Scheduler's frame, in
                                 which every configured Microbatch runs. Default
                                 is 300 seconds: '00:05:00'
resource-pool           1        The Vertica resource pool to run the Scheduler
                                 on. Default is 'general'.
config-refresh          1        The interval of time between Scheduler
                                 configuration refreshes. Default is 5 minutes:
                                 '00:05'
new-source-policy       1        The policy for new Sources to be scheduled
                                 during a frame. Options are: START, END, and
                                 FAIR. Default is 'FAIR'.
pushback-policy         1
pushback-max-count      1
auto-sync               1        Automatically update configuration based on
                                 metadata from the Kafka cluster
consumer-group-id       1        The Kafka consumer group id to report offsets
                                 to.
eof-timeout-ms          1        [DEPRECATED] This option has no effect.

9.2 - 调度程序工具选项

vkconfig 脚本的调度程序工具可用于配置将数据从 Kafka 不断加载到 Vertica 的调度程序。使用调度程序工具创建、更新或删除由 config-schema 定义的调度程序。如果未指定调度程序,则命令适用于默认 stream_config 调度程序。

语法

vkconfig scheduler {--create | --read | --update | --drop} other_options...
--create

创建新的负载规范,不能与 ‑‑delete‑‑read‑‑update 一起使用。

--read
以 JSON 格式输出调度程序的当前设置。不能与 --create--delete--update 配合使用。
--update

更新现有的 Set Snippet Variable Value in Topic。不能与 --create--delete--read 配合使用。

--drop
删除调度程序的架构。删除其架构将删除调度程序。删除调度程序的架构后,无法将其恢复。
--add-operatoruser_name
授予某个 Vertica 用户帐户或角色使用和修改调度程序的权限。需要使用 --update 共享的实用程序选项。
--auto-sync``{TRUE|FALSE}
如果为 TRUE,Vertica 将以 --config-refresh 中指定的间隔自动同步调度程序源信息。

有关调度程序在每个间隔同步哪些内容的详细信息,请参阅使用调度程序自动使用来自 Kafka 的数据中的“验证调度程序”和“同步调度程序”部分。

默认值: TRUE

--config-refreshHH:MM:SS
调度程序在同步其设置或更新其缓存元数据(例如使用 --update 选项所做的更改)之前运行的时间间隔。

默认值: 00:05:00

--consumer-group-idid_name

Kafka 使用者组(Vertica 将向其报告消费消息进度)的名称。设置此值以禁用向 Kafka 使用者组报告进度。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况

默认值: vertica_database-name

--dump

当您将此选项与 --read 选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read 一起使用,此选项无效。

--eof-timeout-msnumber of milliseconds
如果 COPY 命令在 eof-timeout-ms 间隔内未收到任何消息,Vertica 会通过结束该 COPY 语句做出响应。

有关详细信息,请参阅手动使用来自 Kafka 的数据

默认值: 1 秒

--fix-config
修复配置并重新创建所有缺失的表。仅在使用 --update 共享的配置选项时有效。
--frame-durationHH:MM:SS
所有各个时间范围在此调度程序中持续的时间间隔。调度程序必须有足够的时间运行每个微批处理(每个微批处理都将执行 COPY 语句)。您可以使用以下等式估算估计每个微批处理的平均可用时间:
TimePerMicrobatch=(FrameDuration*Parallelism)/Microbatches

这只是粗略估计,因为有多种因素会影响每个微批处理能够运行的时间量。

如果为每个微批处理分配的时间低于 2 秒,vkconfig 实用程序会向您发出警告。您通常应该为每个微批处理分配两秒以上的时间,以便让调度程序能够加载数据流中的所有数据。

默认值: 00:05:00

--message_max_bytesmax_message_size

指定 Kafka 协议批处理消息的最大大小(以字节为单位)。

默认值: 25165824

--new-source-policy``{FAIR|START|END}
确定 Vertica 如何将资源分配给以下任一新添加的源:
  • FAIR:采用以前批处理中的平均时间长度,并适当地调度自己。

  • START:所有新源均在时间范围开始时启动。批处理获得的运行时间最短。

  • END:所有新源均在时间范围结束时启动。批处理获得的运行时间最长。

默认值: FAIR

--operatorusername
允许 dbadmin 为之前创建的 Vertica 用户或角色授予权限。

此选项会为指定用户授予对调度程序实例的所有权限以及对 libkafka 库及其所有 UDx 的 EXECUTE 权限。

授予操作员权限会让用户有权从任何群集(可从 Vertica 节点进行访问)中的任何源读取数据。

dbadmin 必须为用户授予单独的权限,使其对目标表具有写入权限。

需要使用 --create 共享的实用程序选项。创建调度程序后,使用 --add-operator 选项授予操作权限。

要撤销权限,请使用 --remove-operator 选项。

--remove-operatoruser_name
从 Vertica 用户帐户中移除对调度程序的访问权限。需要使用 --update 共享的实用程序选项。
--resource-poolpool_name
此调度程序执行的所有查询将使用的资源池。必须提前创建此池。

默认值: GENERAL

--upgrade
将现有调度程序和配置架构升级到当前 Vertica 版本。升级后的调度程序版本不会向后兼容早期版本。要将调度程序升级到备用架构,请使用 upgrade-to-schema 参数。有关详细信息,请参阅在 Vertica 升级后更新调度程序
--upgrade-to-schemaschema name
将调度程序的架构复制到由 schema name 指定的新架构,然后进行升级使其与当前版本的 Vertica 兼容。Vertica 不会修改旧架构。需要使用 --upgrade 调度程序实用程序选项。
--validation-type``{ERROR|WARN|SKIP}
已重命名自 --skip-validation,用于指定在调度程序上执行的验证级别。无效的 SQL 语法和其他错误可能会导致微批处理无效。Vertica 支持以下验证类型:
  • 错误如果验证失败,则取消配置或创建。

  • WARN:如果验证失败,则继续执行任务,但会显示警告。

  • SKIP:不执行验证。

有关验证的详细信息,请参考使用调度程序自动使用来自 Kafka 的数据

默认值: ERROR

请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。

示例

以下示例显示如何使用调度程序实用程序选项。

授予用户 Jim 对 StreamConfig 调度程序的权限。指定使用 --config-schema 选项对 stream_config 调度程序进行编辑:

$ /opt/vertica/packages/kafka/bin/vkconfig scheduler --update --config-schema stream_config --add-operator Jim

编辑默认 stream_config 调度程序,以便每个微批处理在结束前等待数据一秒钟:

$ /opt/vertica/packages/kafka/bin/vkconfig scheduler --update --eof-timeout-ms 1000

将名为 iot_scheduler_8.1 的调度程序升级到与当前 Vertica 版本兼容的名为 iot_scheduler_9.0 的新调度程序:

$ /opt/vertica/packages/kafka/bin/vkconfig scheduler --upgrade --config-schema iot_scheduler_8.1 \
                                           --upgrade-to-schema iot_scheduler_9.0

删除架构 scheduler219a:

$ /opt/vertica/packages/kafka/bin/vkconfig scheduler --drop --config-schema  scheduler219a --username dbadmin

对于 weblogs.conf 中定义的调度程序,请读取可以使用调度程序工具设置的选项的当前设置。

$ vkconfig scheduler --read --conf weblog.conf
{"version":"v9.2.0", "frame_duration":"00:00:10", "resource_pool":"weblog_pool",
"config_refresh":"00:05:00", "new_source_policy":"FAIR",
"pushback_policy":"LINEAR", "pushback_max_count":5, "auto_sync":true,
"consumer_group_id":null}

9.3 - 群集工具选项

vkconfig 脚本的群集工具可用于定义调度程序连接到的流式传输主机。

语法

vkconfig cluster {--create | --read | --update | --delete} [--cluster cluster_name] [other_options...]
--create

创建新的负载规范,不能与 ‑‑delete‑‑read‑‑update 一起使用。

--read
输出调度程序中定义的所有群集的设置。此输出采用 JSON 格式。不能与 --create--delete--update 配合使用。

您可以通过在 --cluster 选项中提供一个或多个群集名称将输出限制为特定群集。此外,还可以使用 --hosts 选项将输出限制为包含一个或多个特定主机的群集。使用逗号分隔多个值。

您可以在这些选项中使用 LIKE 通配符。有关使用通配符的详细信息,请参阅 LIKE 谓词

--update

更新现有的 Set Snippet Variable Value in Topic。不能与 --create--delete--read 配合使用。

--delete

删除 Set Snippet Variable Value in Topic。不能与 --create--read--update 配合使用。

--dump

当您将此选项与 --read 选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read 一起使用,此选项无效。

--clustercluster_name
要对其执行操作的群集的唯一名称,不区分大小写。--create--update--delete 需要使用此选项。
--hostsb1:port[,b2:port...]
标识要从 Kafka 群集添加、编辑或移除的代理主机。要标识多个主机,请使用逗号分隔符。
\--kafka_conf 'kafka_configuration_setting'

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

--new-clustercluster_name
更新后的群集名称。需要使用 --update 共享的实用程序选项。
--validation-typERROR|WARN|SKIP}
指定在创建或更新后的群集上执行的验证级别:
  • ERROR - 如果 vkconfig 无法验证该群集是否存在,请取消配置或创建。这是默认设置。

  • WARN - 验证失败时继续执行任务,但会显示警告。

  • SKIP - 不执行验证。

已重命名自 --skip-validation

请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。

示例

以下示例显示如何创建群集 StreamCluster1 并分配两个主机:

$ /opt/vertica/packages/kafka/bin/vkconfig cluster --create --cluster StreamCluster1 \
                                           --hosts 10.10.10.10:9092,10.10.10.11:9092
                                           --conf myscheduler.config

以下示例显示如何列出与 weblogs.conf 文件中定义的调度程序关联的所有群集:

$ vkconfig cluster --read --conf weblog.conf
{"cluster":"kafka_weblog",
"hosts":"kafka01.example.com:9092,kafka02.example.com:9092"}

9.4 - 源工具选项

使用 vkconfig 脚本的源工具创建、更新或删除源。

语法

vkconfig source {--create | --read | --update | --delete} --source source_name [other_options...]
--create

创建新的负载规范,不能与 ‑‑delete‑‑read‑‑update 一起使用。

--read
输出调度程序中定义的源的当前设置。输出采用 JSON 格式。不能与 --create--delete--update 配合使用。

默认情况下,此选项输出调度程序中定义的所有源。可以使用 --cluster--enabled--partitions--source 选项限制输出。输出将仅包含与这些选项中的值匹配的源。--enabled 选项只能有 true 或 false 值。--source 选项区分大小写。

您可以在这些选项中使用 LIKE 通配符。有关使用通配符的详细信息,请参阅 LIKE 谓词

--update

更新现有的 Set Snippet Variable Value in Topic。不能与 --create--delete--read 配合使用。

--delete

删除 Set Snippet Variable Value in Topic。不能与 --create--read--update 配合使用。

--sourcesource_name
标识要在调度程序配置中创建或更改的源。此选项区分大小写。可以为新源使用您喜欢的任何名称。大多数人使用调度程序从中加载数据的 Kafka 主题的名称。--create--update--delete 需要使用此选项。
--clustercluster_name
标识包含要创建或编辑的源的群集。必须已经在调度程序中定义该群集。
--dump

当您将此选项与 --read 选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read 一起使用,此选项无效。

--enabled``TRUE|FALSE
如果为 TRUE,则源可供使用。
--new-clustercluster_name
更改此源所属的群集。

引用旧群集源的所有源现在都面向此群集。

需要: --update--source 选项

--new-sourcesource_name
将现有源的名称更新为此参数指定的名称。

需要: --update 共用的实用程序选项

--partitionscount
设置源中的分区数。

默认值:

群集中定义的分区数。

需要: --create--source 选项

您必须将此值与 Kafka 主题中的分区数保持一致。

已重命名自 --num-partitions

--validation-typERROR|WARN|SKIP}
控制对创建或更新的源执行的验证:
  • ERROR - 如果 vkconfig 无法验证该源,则取消配置或创建。这是默认设置。

  • WARN - 验证失败时继续执行任务,但会显示警告。

  • SKIP - 不执行验证。

已重命名自 --skip-validation

请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。

示例

以下示例显示了如何创建或更新 SourceFeed。

创建源 SourceFeed 并将其分配给 myscheduler.conf 配置文件定义的调度程序中的群集 StreamCluster1:

$ /opt/vertica/packages/kafka/bin/vkconfig source --create --source SourceFeed \
                                           --cluster StreamCluster1 --partitions 3
                                           --conf myscheduler.conf

更新现有源 SourceFeed,以使用 myscheduler.conf 配置文件定义的调度程序中的现有群集 StreamCluster2:

$ /opt/vertica/packages/kafka/bin/vkconfig source --update --source SourceFeed \
                                           --new-cluster StreamCluster2
                                           --conf myscheduler.conf

以下示例读取 weblogs.conf 文件定义的调度程序中定义的源。

$ vkconfig source --read --conf weblog.conf
{"source":"web_hits", "partitions":1, "src_enabled":true,
"cluster":"kafka_weblog",
"hosts":"kafka01.example.com:9092,kafka02.example.com:9092"}

9.5 - 目标工具选项

使用目标工具可以配置 Vertica 表从流式数据传输应用程序接收数据。

语法

vkconfig target {--create | --read | --update | --delete} [--target-table table --table_schema schema] [other_options...]
--create

创建新的负载规范,不能与 ‑‑delete‑‑read‑‑update 一起使用。

--read
输出调度程序中定义的目标。此输出采用 JSON 格式。不能与 --create--delete--update 配合使用。

默认情况下,此选项输出配置架构中定义的所有目标。可以使用 --target-schema--target-table 选项将输出限制到特定目标。vkconfig 脚本仅输出与这些选项中设置的值匹配的目标。

您可以在这些选项中使用 LIKE 通配符。有关使用通配符的详细信息,请参阅 LIKE 谓词

--update

更新现有的 Set Snippet Variable Value in Topic。不能与 --create--delete--read 配合使用。

--delete

删除 Set Snippet Variable Value in Topic。不能与 --create--read--update 配合使用。

--target-tabletable
从调度程序接收数据的 Vertica 表的名称。--create--update--delete 需要使用此选项。
--target-schema架构
包含目标表的现有 Vertica 架构。--create--update--delete 需要使用此选项。
--dump

当您将此选项与 --read 选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read 一起使用,此选项无效。

--new-target-schemaschema_name
将与此架构关联的 Vertica 架构更改为已创建的新架构。

需要: --update 选项。

--new-target-tableschema_name
将与此架构关联的 Vertica 目标表更改为已创建的新表。

需要: --update 选项。

--validation-typERROR|WARN|SKIP}
控制对创建或更新的目标执行的验证:
  • ERROR - 如果 vkconfig 无法验证该表是否存在,则取消配置或创建。这是默认设置。

  • WARN - 如果验证失败,则创建或更新目标,但会显示警告。

  • SKIP - 不执行验证。

已重命名自 --skip-validation

请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。

示例

以下示例显示了如何从 public.streamtarget 表为 myscheduler.conf 配置文件中定义的调度程序创建目标:

$ /opt/vertica/packages/kafka/bin/vkconfig target --create --target-table streamtarget --conf myscheduler.conf

以下示例列出在 weblogs.conf 配置文件中定义的调度程序中的所有目标。

$ vkconfig target --read --conf weblog.conf
{"target_schema":"public", "target_table":"web_hits"}

9.6 - 加载规范工具选项

vkconfig 脚本的加载规范工具可用于为加载流式传输数据的 COPY 语句提供参数。

语法

$ vkconfig load-spec {--create | --read | --update | --delete} [--load-spec spec‑name] [other‑options...]
--create

创建新的负载规范,不能与 ‑‑delete‑‑read‑‑update 一起使用。

--read
输出调度程序中定义的加载规范的当前设置。此输出采用 JSON 格式。不能与 ‑‑create‑‑delete‑‑update 配合使用。

默认情况下,此选项会输出调度程序中定义的所有加载规范。您可以通过向这些选项提供一个值或逗号分隔的值列表来限制输出:

  • --load-spec

  • --filters

  • --uds-kv-parameters

  • --parser

  • --message-max-bytes

  • --parser-parameters

vkconfig 脚本仅输出与您提供的值匹配的加载规范配置。

您可以在这些选项中使用 LIKE 通配符。有关使用通配符的详细信息,请参阅 LIKE 谓词

--update

更新现有的 Set Snippet Variable Value in Topic。不能与 --create--delete--read 配合使用。

--delete

删除 Set Snippet Variable Value in Topic。不能与 --create--read--update 配合使用。

\--load-spec spec‑name
要对其执行操作的复制加载规范的唯一名称。--create--update--delete 需要使用此选项。
--dump

当您将此选项与 --read 选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read 一起使用,此选项无效。

\--filters "filter‑name"
包含要在 COPY 语句中使用的所有 UDFilter 的 Vertica FILTER 链。有关筛选器的详细信息,请参考解析自定义格式
\--message-max-bytes max‑size

指定 Kafka 协议批处理消息的最大大小(以字节为单位)。

默认值: 25165824

\--new-load-spec new‑name
现有加载规范的新唯一名称。需要使用 --update 参数。
\--parser-parameters "key=value[,...]"
提供给 --parser 参数中指定的解析器的参数列表。在使用 Vertica 本机解析器时,调度程序会将这些参数传递给 COPY 语句,然后转而将其传递给解析器。
\--parser parser‑name
标识要用于指定目标的 Vertica UDParser。此解析器在调度程序为加载数据而运行的 COPY 语句中使用。如果使用的是 Vertica 本机解析器,则提供给 --parser-parameters 选项的值将传递给 COPY 语句。

默认值: KafkaParser

\--uds-kv-parameters key=value[,...]
用户定义的源的键值对的逗号分隔列表。
--validation-type {ERROR|WARN|SKIP}
将对创建或更新后的加载规范执行的验证指定为以下值之一:
  • ERROR:如果 vkconfig 无法验证加载规范,则取消配置或创建。这是默认设置。

  • WARN:如果验证失败,则继续执行任务,但会显示警告。

  • SKIP:不执行验证。

已重命名自 --skip-validation

请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。

示例

以下示例显示如何使用加载规范实用程序选项。

创建加载规范 Streamspec1

$ /opt/vertica/packages/kafka/bin/vkconfig load-spec --create --load-spec Streamspec1 --conf myscheduler.conf

将加载规范 Streamspec1 重命名为 Streamspec2

$ /opt/vertica/packages/kafka/bin/vkconfig load-spec --update --load-spec Streamspec1 \
                                                     --new-load-spec Streamspec2 \
                                                     --conf myscheduler.conf

更新加载规范 Filterspec 以使用 KafkaInsertLengths 筛选器和自定义解密筛选器:

$ /opt/vertica/packages/kafka/bin/vkconfig load-spec --update --load-spec Filterspec \
                                                     --filters "KafkaInsertLengths() DecryptFilter(parameter=Key)" \
                                                     --conf myscheduler.conf

读取加载规范 streamspec1 的当前设置:

$ vkconfig load-spec --read --load-spec streamspec1 --conf weblog.conf
{"load_spec":"streamspec1", "filters":null, "parser":"KafkaParser",
"parser_parameters":null, "load_method":"TRICKLE", "message_max_bytes":null,
"uds_kv_parameters":null}

9.7 - 微批处理工具选项

使用 vkconfig 脚本的微批处理工具,可以配置调度程序的微批处理。

语法

vkconfig microbatch {--create | --read | --update | --delete} [--microbatch name] [other_options...]
--create

创建新的负载规范,不能与 ‑‑delete‑‑read‑‑update 一起使用。

--read
输出调度程序中定义的所有微批处理的当前设置。此输出采用 JSON 格式。不能与 ‑‑create‑‑delete‑‑update 配合使用。

可以使用 ‑‑consumer-group-id‑‑enabled‑‑load-spec‑‑microbatch‑‑rejection-schema‑‑rejection-table‑‑target-schema‑‑target-table‑‑target-columns 选项将输出限制到特定的微批处理。‑‑enabled 选项仅接受 true 或 false 值。

您可以在这些选项中使用 LIKE 通配符。有关使用通配符的详细信息,请参阅 LIKE 谓词

--update

更新现有的 Set Snippet Variable Value in Topic。不能与 --create--delete--read 配合使用。

--delete

删除 Set Snippet Variable Value in Topic。不能与 --create--read--update 配合使用。

\--microbatch name
微批处理的唯一名称,不区分大小写。--create--update--delete 需要使用此选项。
\--add-source-cluster cluster_name
要分配给您使用 --microbatch 选项指定的微批处理的群集的名称。您可以在每个命令中使用此参数一次。此外,还可以将它与 --update 一起使用,以将源添加到微批处理。您只能将来自同一群集的源添加到同一个微批处理中。需要 \--add-source
\--add-source source_name
要分配给此微批处理的源的名称。您可以在每个命令中使用此参数一次。此外,还可以将它与 --update 一起使用,以将源添加到微批处理。需要 \--add-source-cluster
\--cluster cluster_name
--offset 选项将应用于的群集的名称。仅当微批处理定义了多个群集或提供了 --source 参数时才需要。需要 --offset 选项。
\--consumer-group-id id_name

Kafka 使用者组(Vertica 将向其报告消费消息进度)的名称。设置此值以禁用向 Kafka 使用者组报告进度。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况

默认值: vertica_database-name

--dump

当您将此选项与 --read 选项一起使用时,vkconfig 会输出用来检索数据的 Vertica 查询,而不是输出数据本身。如果您想从 Vertica 中访问数据而无需通过 vkconfig,则此选项很有用。如果不与 --read 一起使用,此选项无效。

--enabled TRUE|FALSE
为 TRUE 时,允许执行微批处理。
\--load-spec loadspec_name
处理此微批处理时要使用的加载规范。
\--max-parallelism max_num_loads
为微批处理创建的同步 COPY 语句的最大数量。调度程序会将具有多个分区的单个微批处理动态拆分为具有较少分区的 max_num_loads COPY 语句。

使用此选项可以:

\--new-microbatch updated_name
更新后的微批处理名称。需要 --update 选项。
\--offset partition_offset[,...]
微批处理开始加载的源中消息的偏移量。如果使用此参数,则必须为源中的每个分区或在 --partition 选项中列出的每个分区提供偏移量值。

您可以使用此选项跳过源中的某些消息或重新加载之前读取的消息。

有关详细信息,请参阅下面的特殊起始偏移量值

\--partition partition[,...]
--offset 选项中指定的偏移量将应用于的一个或多个分区。如果您提供此选项,则 --offset 选项中指定的偏移量将应用于您指定的分区。需要 --offset 选项。
\--rejection-schema schema_name
现有的 Vertica 架构,其中包含用于存储被拒绝消息的表。
\--rejection-table table_name
用于存储被拒绝消息的现有 Vertica 表。
\--remove-source-cluster cluster_name
要从此微批处理中移除的群集的名称。 您可以在每个命令中使用此参数一次。需要 --remove-source
\--remove-source source_name
要从此微批处理中移除的源的名称。您可以在每个命令中使用此参数一次。此外,还可以将它与 --update 一起使用,以从微批处理中移除多个源。需要 --remove-source-cluster
\--source source_name
--offset 选项中的偏移量将应用于的源的名称。当微批处理定义多个源或指定 --cluster 参数时需要。需要 --offset 选项。
\--target-columns column_expression
目标表的列表达式,其中 column_expression 可以是列的逗号分隔列表或完整的表达式。

有关列表达式的描述,请参阅 COPY 语句参数

\--target-schema schema_name
与此微批处理关联的现有 Vertica 目标架构。
\--target-table table_name
与目标对应的 Vertica 表的名称。此表必须属于目标架构。
\--validation-type{ERROR|WARN|SKIP}
控制对已创建或已更新的微批处理执行的验证:
  • ERROR - 如果 vkconfig 无法验证该微批处理,则取消配置或创建。这是默认设置。

  • WARN - 验证失败时继续执行任务,但会显示警告。

  • SKIP - 不执行验证。

已重命名自 --skip-validation

请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。

stream 参数的 start_offset 部分允许您从主题分区中的特定点开始加载消息。它还接受两个特殊偏移值之一:

  • -2 告知Set Snippet Variable Value in Topic从主题分区中最早可用的消息开始加载。当您需要从 Kafka 主题的分区中尽量加载更多消息时,此值十分有用。

  • -3 告知Set Snippet Variable Value in Topic从使用者组保存的偏移量开始加载。如果使用者组没有保存的偏移量,它会从主题分区中最早可用的消息开始加载。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况

示例

此示例显示如何创建微批处理 mbatch1。此微批处理会标识微批处理的架构、目标表、加载规范和源:

$ /opt/vertica/packages/kafka/bin/vkconfig microbatch --create --microbatch mbatch1 \
                                                    --target-schema public \
                                                    --target-table BatchTarget \
                                                    --load-spec Filterspec \
                                                    --add-source SourceFeed \
                                                    --add-source-cluster StreamCluster1 \
                                                    --conf myscheduler.conf

此示例演示了如何在 weblog.conf 配置文件中定义的调度程序中列出微批处理的当前设置。

$ vkconfig microbatch --read --conf weblog.conf
{"microbatch":"weblog", "target_columns":null, "rejection_schema":null,
"rejection_table":null, "enabled":true, "consumer_group_id":null,
"load_spec":"weblog_load", "filters":null, "parser":"KafkaJSONParser",
"parser_parameters":null, "load_method":"TRICKLE", "message_max_bytes":null,
"uds_kv_parameters":null, "target_schema":"public", "target_table":"web_hits",
"source":"web_hits", "partitions":1, "src_enabled":true, "cluster":"kafka_weblog",
"hosts":"kafka01.example.com:9092,kafka02.example.com:9092"}

9.8 - 启动工具选项

使用 vkconfig 脚本启动工具可为调度程序实例分配名称。

语法

vkconfig launch [options...]
--enable-ssl``{true|false}
(可选)在 Kafka 和 Vertica 之间启用 SSL 身份验证
。有关详细信息,请参考 使用 Kafka 进行 TLS/SSL 加密
--ssl-ca-alias别名
根证书颁发机构的用户定义别名,您可用来验证 Vertica 和 Kafka 之间的通信。此参数仅在启用 SSL 时使用。
--ssl-key-alias别名
密钥/证书对的用户定义别名,您可用来验证 Vertica 和 Kafka 之间的通信。此参数仅在启用 SSL 时使用。
--ssl-key-passwordpassword
用于创建 SSL 密钥的密码。此参数仅在启用 SSL 时使用。
--instance-namename
(可选)允许命名正在运行调度程序的进程。您可以在查看 scheduler_history 表时使用此命令,从而查找当前正在运行的实例。
\--kafka_conf 'kafka_configuration_setting'

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。

示例

此示例显示如何启动 myscheduler.conf 配置文件中定义的调度程序并为其指定实例名称 PrimaryScheduler:

$ nohup /opt/vertica/packages/kafka/bin/vkconfig launch --instance-name PrimaryScheduler \
  --conf myscheduler.conf >/dev/null 2>&1 &

此示例显示如何在启用 SSL 的情况下启动名为 SecureScheduler 的实例:

$ nohup /opt/vertica/packages/kafka/bin/vkconfig launch --instance-name SecureScheduler --enable-SSL true \
                                                  --ssl-ca-alias authenticcert --ssl-key-alias ourkey \
                                                  --ssl-key-password secret \
                                                  --conf myscheduler.conf \
                                                  >/dev/null 2>&1 &

9.9 - 关闭工具选项

使用 vkconfig 脚本的关闭工具终止主机上运行的一个或全部 Vertica 调度程序。始终在重新启动调度程序之前运行此命令,以确保调度程序已正确关闭。

语法

vkconfig shutdown [options...]

请参阅常用 vkconfig 脚本选项了解所有 vkconfig 工具中提供的选项。

示例

要终止在主机上运行的所有调度程序,请使用不带选项的 shutdown 命令:

$ /opt/vertica/packages/kafka/bin/vkconfig shutdown

使用 --conf--config-schema 选项指定要关闭的调度程序。以下命令终止使用相同 --conf myscheduler.conf 选项启动的调度程序:

$ /opt/vertica/packages/kafka/bin/vkconfig shutdown --conf myscheduler.conf

9.10 - 统计信息工具选项

统计信息工具可用于访问调度程序已运行的微批处理的历史记录。该工具以 JSON 格式将微批处理的日志输出到标准输出。可以使用其选项来筛选微批处理列表,以获得您感兴趣的微批处理。

语法

vkconfig statistics [options]
\--cluster "cluster"[,"cluster2"...]
仅返回从名称与您提供的列表中的名称相匹配的群集中检索数据的微批处理。
--dump
返回 vkconfig 为了从调度程序表中提取数据而会执行的 SQL 查询,而不是返回微批处理数据。如果您希望使用 Vertica 客户端应用程序来获取微批处理日志,而不是使用 vkconfig 的 JSON 输出,则可以使用此选项。
\--from-timestamp "timestamp"
仅返回在 timestamp 之后开始的微批处理。timestamp 值采用 yyyy-[m]m-[d]d hh:mm:ss 格式。

不能与 --last 结合使用。

\--last number
返回满足所有其他筛选器的 number 个最近的微批处理。不能与 --from-timestamp--to-timestamp 结合使用。
\--microbatch "name"[,"name2"...]
仅返回名称与逗号分隔列表中的名称之一相匹配的微批处理。
\--partition partition#[,partition#2...]
仅返回从与分区列表中的值之一相匹配的主题分区访问数据的微批处理。
\--source "source"[,"source2"...]
仅返回从名称与您提供给此实参的列表中的名称之一相匹配的源访问数据的微批处理。
--target-schema "schema"[,"schema2"...]
仅返回将数据写入名称与目标架构列表实参中的名称之一相匹配的 Vertica 架构的微批处理。
--target-table "table"[,"table2"...]
仅返回将数据写入名称与目标表列表实参中的名称之一相匹配的 Vertica 表的微批处理。
\--to-timestamp "timestamp"
仅返回在 timestamp 之前开始的微批处理。timestamp 值采用 yyyy-[m]m-[d]d hh:mm:ss 格式。

不能与 --last 结合使用。

请参阅常用 vkconfig 脚本选项以了解所有 vkconfig 工具中提供的选项。

用法注意事项

  • 可以在提供给 --cluster--microbatch--source--target-schema--target-table 实参的值中使用 LIKE 通配符。此功能可用于匹配微批处理数据中的部分字符串。有关使用通配符的详细信息,请参阅 LIKE 谓词

  • --cluster--microbatch--source--target-schema--target-table 实参的字符串比较不区分大小写。

  • 提供给 --from-timestamp--to-timestamp 实参的日期和时间值使用 java.sql.timestamp 格式以便解析该值。此格式的解析可以接受您可能认为无效并希望它拒绝的值。例如,如果提供的时间戳为 01-01-2018 24:99:99,Java 时间戳解析器会静默地将日期转换为 2018-01-02 01:40:39,而不是返回错误。

示例

以下示例获取 weblog.conf 文件中定义的调度程序运行的最后一个微批处理:

$ /opt/vertica/packages/kafka/bin/vkconfig statistics --last 1 --conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":80000, "end_offset":79999, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":0, "partition_messages":0,
"timeslice":"00:00:09.793000", "batch_start":"2018-11-06 09:42:00.176747",
"batch_end":"2018-11-06 09:42:00.437787", "source_duration":"00:00:00.214314",
"consecutive_error_count":null, "transaction_id":45035996274513069,
"frame_start":"2018-11-06 09:41:59.949", "frame_end":null}

如果调度程序正在从多个分区读取,则 --last 1 选项会列出来自每个分区的最后一个微批处理:

$ /opt/vertica/packages/kafka/bin/vkconfig statistics --last 1 --conf iot.conf
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":0,
"start_offset":-2, "end_offset":-2, "end_reason":"DEADLINE",
"end_reason_message":null, "partition_bytes":0, "partition_messages":0,
"timeslice":"00:00:09.842000", "batch_start":"2018-11-06 12:52:49.387567",
"batch_end":"2018-11-06 12:52:59.400219", "source_duration":"00:00:09.950127",
"consecutive_error_count":null, "transaction_id":45035996274537015,
"frame_start":"2018-11-06 12:52:49.213", "frame_end":null}
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":1,
"start_offset":1604, "end_offset":1653, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":4387, "partition_messages":50,
"timeslice":"00:00:09.842000", "batch_start":"2018-11-06 12:52:49.387567",
"batch_end":"2018-11-06 12:52:59.400219", "source_duration":"00:00:00.220329",
"consecutive_error_count":null, "transaction_id":45035996274537015,
"frame_start":"2018-11-06 12:52:49.213", "frame_end":null}
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":2,
"start_offset":1603, "end_offset":1652, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":4383, "partition_messages":50,
"timeslice":"00:00:09.842000", "batch_start":"2018-11-06 12:52:49.387567",
"batch_end":"2018-11-06 12:52:59.400219", "source_duration":"00:00:00.318997",
"consecutive_error_count":null, "transaction_id":45035996274537015,
"frame_start":"2018-11-06 12:52:49.213", "frame_end":null}
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":3,
"start_offset":1604, "end_offset":1653, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":4375, "partition_messages":50,
"timeslice":"00:00:09.842000", "batch_start":"2018-11-06 12:52:49.387567",
"batch_end":"2018-11-06 12:52:59.400219", "source_duration":"00:00:00.219543",
"consecutive_error_count":null, "transaction_id":45035996274537015,
"frame_start":"2018-11-06 12:52:49.213", "frame_end":null}

可以使用 --partition 实参只获取所需的分区:

$ /opt/vertica/packages/kafka/bin/vkconfig statistics --last 1 --partition 2 --conf iot.conf
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":2,
"start_offset":1603, "end_offset":1652, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":4383, "partition_messages":50,
"timeslice":"00:00:09.842000", "batch_start":"2018-11-06 12:52:49.387567",
"batch_end":"2018-11-06 12:52:59.400219", "source_duration":"00:00:00.318997",
"consecutive_error_count":null, "transaction_id":45035996274537015,
"frame_start":"2018-11-06 12:52:49.213", "frame_end":null}

如果调度程序从多个源读取,则 --last 1 选项会输出来自每个源的最后一个微批处理:

$ /opt/vertica/packages/kafka/bin/vkconfig statistics --last 1 --conf weblog.conf
{"microbatch":"weberrors", "target_schema":"public", "target_table":"web_errors",
"source_name":"web_errors", "source_cluster":"kafka_weblog",
"source_partition":0, "start_offset":10000, "end_offset":9999,
"end_reason":"END_OF_STREAM", "end_reason_message":null,
"partition_bytes":0, "partition_messages":0, "timeslice":"00:00:04.909000",
"batch_start":"2018-11-06 10:58:02.632624",
"batch_end":"2018-11-06 10:58:03.058663", "source_duration":"00:00:00.220618",
"consecutive_error_count":null, "transaction_id":45035996274523991,
"frame_start":"2018-11-06 10:58:02.394", "frame_end":null}
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":80000, "end_offset":79999, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":0, "partition_messages":0,
"timeslice":"00:00:09.128000", "batch_start":"2018-11-06 10:58:03.322852",
"batch_end":"2018-11-06 10:58:03.63047", "source_duration":"00:00:00.226493",
"consecutive_error_count":null, "transaction_id":45035996274524004,
"frame_start":"2018-11-06 10:58:02.394", "frame_end":null}

可以使用通配符来启用部分匹配。以下示例演示了如何获取名称以“log”结尾的所有微批处理的最后一个微批处理:

~$ /opt/vertica/packages/kafka/bin/vkconfig statistics --microbatch "%log" \
                                            --last 1 --conf weblog.conf
{"microbatch":"weblog", "target_schema":"public", "target_table":"web_hits",
"source_name":"web_hits", "source_cluster":"kafka_weblog", "source_partition":0,
"start_offset":80000, "end_offset":79999, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":0, "partition_messages":0,
"timeslice":"00:00:04.874000", "batch_start":"2018-11-06 11:37:16.17198",
"batch_end":"2018-11-06 11:37:16.460844", "source_duration":"00:00:00.213129",
"consecutive_error_count":null, "transaction_id":45035996274529932,
"frame_start":"2018-11-06 11:37:15.877", "frame_end":null}

要获取特定时间段的微批处理,请使用 --from-timestamp--to-timestamp 实参。以下示例获取 iot.conf 中定义的调度程序在 2018-11-06 12:52:30 到 12:53:00 之间从分区 #2 进行读取的微批处理。

$ /opt/vertica/packages/kafka/bin/vkconfig statistics  --partition 1 \
                        --from-timestamp "2018-11-06 12:52:30" \
                        --to-timestamp "2018-11-06 12:53:00" --conf iot.conf
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":1,
"start_offset":1604, "end_offset":1653, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":4387, "partition_messages":50,
"timeslice":"00:00:09.842000", "batch_start":"2018-11-06 12:52:49.387567",
"batch_end":"2018-11-06 12:52:59.400219", "source_duration":"00:00:00.220329",
"consecutive_error_count":null, "transaction_id":45035996274537015,
"frame_start":"2018-11-06 12:52:49.213", "frame_end":null}
{"microbatch":"iotlog", "target_schema":"public", "target_table":"iot_data",
"source_name":"iot_data", "source_cluster":"kafka_iot", "source_partition":1,
"start_offset":1554, "end_offset":1603, "end_reason":"END_OF_STREAM",
"end_reason_message":null, "partition_bytes":4371, "partition_messages":50,
"timeslice":"00:00:09.788000", "batch_start":"2018-11-06 12:52:38.930428",
"batch_end":"2018-11-06 12:52:48.932604", "source_duration":"00:00:00.231709",
"consecutive_error_count":null, "transaction_id":45035996274536981,
"frame_start":"2018-11-06 12:52:38.685", "frame_end":null}

以下示例演示了如何使用 --dump 实参获取 vkconfig 为检索上一个示例的输出而执行的 SQL 语句:

$ /opt/vertica/packages/kafka/bin/vkconfig statistics  --dump --partition 1 \
                       --from-timestamp "2018-11-06 12:52:30" \
                       --to-timestamp "2018-11-06 12:53:00" --conf iot.conf
SELECT microbatch, target_schema, target_table, source_name, source_cluster,
source_partition, start_offset, end_offset, end_reason, end_reason_message,
partition_bytes, partition_messages, timeslice, batch_start, batch_end,
last_batch_duration AS source_duration, consecutive_error_count, transaction_id,
frame_start, frame_end FROM "iot_sched".stream_microbatch_history WHERE
(source_partition = '1') AND (frame_start >= '2018-11-06 12:52:30.0') AND
(frame_start < '2018-11-06 12:53:00.0') ORDER BY frame_start DESC, microbatch,
source_cluster, source_name, source_partition;

9.11 - 同步工具选项

同步实用程序通过查询源定义的 Kafka 群集代理来立即更新所有源定义。默认情况下,它会更新目标架构中定义的所有源。要仅更新特定源,请使用 --source--cluster 选项指定要更新的源。

语法

vkconfig sync [options...]
--sourcesource_name
源同步的名称。此源必须已存在于目标架构中。
--clustercluster_name
标识包含要同步的源的群集。必须已经在调度程序中定义该群集。
\--kafka_conf 'kafka_configuration_setting'

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

请参阅常用 vkconfig 脚本选项了解所有 vkconfig 工具中提供的选项。

10 - Kafka 函数参考

此部分将列出构成 Vertica 的 Kafka 集成功能的函数。

10.1 - KafkaAvroParser

KafkaAvroParser 会解析 Avro 格式的 Kafka 消息并将其加载到常规 Vertica 表或 Vertica Flex 表中。

语法

KafkaAvroParser(param=value[,...])
enforce_length
设置为 TRUE 时,如果有值太宽而无法容纳到其所在的列中,则拒绝该行。使用默认设置 (FALSE) 时,解析器会截断任何太宽而无法容纳到列最大宽度内的值。
reject_on_materialized_type_error
设置为 TRUE 时,如果行中含有实体化列值,但该列值无法映射到实体化列的数据类型,则拒绝该行。
flatten_maps
如果设置为 TRUE,则将所有 Avro 映射扁平化。
flatten_arrays
如果设置为 TRUE,则将 Avro 数组扁平化。
flatten_records
如果设置为 TRUE,则将所有 Avro 记录扁平化。
external_schema
将 Avro 文件的架构指定为 JSON 字符串。如果未指定此参数,解析器假定每个消息都具有架构。如果使用的是架构注册表,请勿使用此参数。
codec
指定编写 Avro 文件所用的编解码器。有效值包括:
  • 'default' - 不压缩数据,因此不需要编解码器

  • 'deflate' - 使用 Deflate 编解码器压缩数据

  • 'snappy' - Snappy 压缩

with_metadata
如果设置为 TRUE,消息包括 Avro 基准、架构和对象元数据。默认情况下,KafkaAvroParser 会解析消息,但不包括架构和元数据。如果启用此参数,请使用 Avro API 编写消息,并确认它们仅包含 Avro 基准。默认值为 FALSE。
schema_registry_url
必需。指定 Confluent 架构注册表的 URL。需要使用此参数根据架构注册表版本加载数据。如果使用的是外部架构,请勿使用此参数。有关详细信息,请参考 在 Kafka 中使用架构注册表。
schema_registry_ssl_ca_path
进行 TLS 连接时需要提供。Vertica 节点文件系统上指向已对架构注册表服务器证书签名的证书颁发机构 (CA) 的路径。

每个 Vertica 节点都必须将 CA 存储在相同的文件系统路径中。

schema_registry_ssl_cert_path
Vertica 节点文件系统上指向由架构注册表信任的证书颁发机构 (CA) 所颁发的客户端证书的路径。
schema_registry_ssl_key_path
Vertica 服务器文件系统上指向使用 schema_registry_ssl_cert_path 定义的客户端证书的私钥的路径。
schema_registry_ssl_key_password_path
Vertica 服务器文件系统上指向使用 schema_registry_ssl_key_path 定义的私钥的可选密码的路径。
schema_registry_subject
在架构注册表中,表示用于加载数据的架构的主题。
schema_registry_version
在架构注册表中,表示用于加载数据的架构的版本。
key_separator
设置用作键之间的分隔符的字符。

数据类型

KafkaAvroParser 支持与 favroparser 相同的数据类型。有关详细信息,请参阅Avro 数据

示例

以下示例演示了如何以 Avro 格式从 Kafka 加载数据。该语句:

  • 会将数据加载到名为 weather_logs 的现有 Flex 表中。

  • 会从默认 Kafka 代理(通过端口 9092 在本地系统上运行)复制数据。

  • 源被命名为 temperature。

  • 源具有一个分区。

  • 加载从偏移量 0 处开始。

  • 加载在 10 秒后或在加载到达源结尾后结束,以先发生者为准。

  • KafkaAvroParser 不会将它在源中找到的任何数组、映射或记录扁平化。

  • 数据的架构将在语句中以 JSON 字符串的形式提供。该架构定义了名为 Weather 的记录类型,其中包含表示工作站名称、时间和温度的字段。

  • 被拒绝的数据行将保存到名为 t_rejects1 的表中。

=> COPY weather_logs
   SOURCE KafkaSource(stream='temperature|0|0', stop_on_eof=true,
                      duration=interval '10 seconds')
   PARSER KafkaAvroParser(flatten_arrays=False, flatten_maps=False, flatten_records=False,
                          external_schema=E'{"type":"record","name":"Weather","fields":'
                                           '[{"name":"station","type":"string"},'
                                           '{"name":"time","type":"long"},'
                                           '{"name":"temp","type":"int"}]}')
   REJECTED DATA AS TABLE "t_rejects1";

10.2 - KafkaCheckBrokers

检索 Kafka 群集中各个代理的相关信息。此函数主要供内部使用 — 流式传输作业调度程序会使用它来获取 Kafka 群集中代理的列表。您可以调用该函数来确定 Vertica 可识别的代理。

语法

KafkaCheckBrokers(USING PARAMETERS brokers='hostname:port[,hostname2:port...]'
                                   [, kafka_conf='kafka_configuration_setting']
                                   [, timeout=timeout_sec])
brokers
Kafka 群集中代理的主机名和端口号,用于检索代理的列表。您可以使用逗号分隔的列表提供多个代理。如果列表包含来自多个 Kafka 群集的代理,则会查询包含列表中最后一个主机的群集。
kafka_conf

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

timeout

等待 Kafka 群集做出响应的整数秒数。

示例

=> SELECT KafkaCheckBrokers(USING PARAMETERS brokers='kafka01.example.com:9092')
          OVER ();
 broker_id |        hostname     | port
-----------+---------------------+------
         2 | kafka03.example.com | 9092
         1 | kafka02.example.com | 9092
         3 | kafka04.example.com | 9092
         0 | kafka01.example.com | 9092
(4 rows)

10.3 - KafkaExport

将 Vertica 数据发送到 Kafka。

如果 Vertica 成功将所有数据行导出到 Kafka,则此函数将返回零行。您可以使用此函数的输出将失败消息复制到辅助表,以备评估和重新处理。

语法

SELECT KafkaExport(partitionColumn, keyColumn, valueColumn
    USING PARAMETERS brokers='host[:port][,host...]',
    topic='topicname'
    [,kafka_conf='kafka_configuration_setting']
    [,fail_on_conf_parse_error=Boolean])
OVER (partition_clause) FROM table;

参数

partitionColumn
要导出的目标分区。如果将此值设为 NULL,Vertica 将使用默认分区架构。您可以使用 partition 实参将消息发送至映射到 Vertica 分段的分区。
keyColumn
与 valueColumn 关联的用户定义的键值。使用 NULL 跳过此实参。
valueColumn
消息自身。该列是 LONG VARCHAR,允许向 Kafka 最多发送 32MB 数据。但是,Kafka 可能会对消息大小施加自己的限制。
brokers
包含 Kafka 群集中代理的一个或多个主机名或 IP 地址(含可选端口号)的逗号分隔列表的字符串。
topic
要导出到的 Kafka 主题。
kafka_conf

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

fail_on_conf_parse_error

确定 kafka_conf 包含格式不正确的选项和值或者无效的配置属性时,该函数是否失败。

默认值:FALSE

有关接受的选项和值格式,请参阅直接设置 Kafka 库选项

有关有效配置属性的列表,请参阅 rdkafka GitHub 存储库

示例

以下示例会将 iot_report 表中的每一行转换为 JSON 对象,然后将其导出到名为 iot-report 的 Kafka 主题。

iot_report 表包含以下列和数据:

=> SELECT * FROM iot_report;
 server |        date         |        location         |    id
--------+---------------------+-------------------------+----------
      1 | 2016-10-11 04:09:28 |  -14.86058, 112.75848   | 70982027
      1 | 2017-07-02 12:37:48 |  -21.42197, -127.17672  | 49494918
      2 | 2017-01-20 03:05:24 |  37.17372, 136.14026    | 36670100
      2 | 2017-07-29 11:38:37 |  -38.99517, 171.72671   | 52049116
      1 | 2017-10-19 14:04:33 |  -71.72156, -36.27381   | 94328189
(5 rows)

要构建 KafkaExport 函数,请提供以下值以定义 Kafka 消息:

  • partitionColumn:使用 server 列。由于 server 列值从 1 开始,而 Kafka 分区从 0 开始,因此需从 server 值中减去 1。

  • keyColumn:使用 id 列。此列要求您将 id显式转换为 VARCHAR 类型。

  • valueColumn:每条消息都会在已导出的 JSON 对象中将 date 和 location 列格式化为键/值对。

    要将这些行转换为 JSON 格式,请使用 ROW 函数将 date 和 location 列转换为结构化数据。然后,将 ROW 函数传递给 TO_JSON 函数,以将数据结构编码为 JSON 对象。

填写所需的剩余函数实参并执行 KafkaExport 函数。如果成功,该函数将返回 0 行:

=> SELECT KafkaExport(server - 1, id::VARCHAR, TO_JSON(ROW(date, location))
                      USING PARAMETERS brokers='broker01:9092,broker02:9092',
                      topic='iot-results')
   OVER (PARTITION BEST)
   FROM iot_report;
 partition | key | message | failure_reason
-----------+-----+---------+----------------
(0 rows)

使用 kafkacat 验证使用者是否在 iot-results 主题中包含 JSON 格式的数据:

$ /opt/vertica/packages/kafka/bin/kafkacat -C -t iot-results -b broker01:9092,broker02:9092
{"date":"2017-01-20 03:05:24","location":" 37.17372, 136.14026   "}
{"date":"2017-07-29 11:38:37","location":" -38.99517, 171.72671  "}
{"date":"2016-10-11 04:09:28","location":" -14.86058, 112.75848  "}
{"date":"2017-10-19 14:04:33","location":" -71.72156, -36.27381  "}
{"date":"2017-07-02 12:37:48","location":" -21.42197, -127.17672 "}

另请参阅

使用 KafkaExport 生成数据

10.4 - KafkaJSONParser

KafkaJSONParser 会解析 JSON 格式的 Kafka 消息并将其加载到常规 Vertica 表或 Vertica Flex 表中。

语法

KafkaJSONParser(
        [enforce_length=Boolean]
        [, flatten_maps=Boolean]
        [, flatten_arrays=Boolean]
        [, start_point=string]
        [, start_point_occurrence=integer]
        [, omit_empty_keys=Boolean]
        [, reject_on_duplicate=Boolean]
        [, reject_on_materialized_type_error=Boolean]
        [, reject_on_empty_key=Boolean]
        [, key_separator=char]
        [, suppress_nonalphanumeric_key_chars=Boolean]
        )
enforce_length
设置为 TRUE 时,如果加载的数据太宽而无法容纳到其所在的列中,则拒绝该行。默认设置为 FALSE,以截断任何太宽而无法容纳到其所在列的数据。
flatten_maps
如果设置为 TRUE,则将所有 JSON 映射扁平化。
flatten_arrays
如果设置为 TRUE,则将 JSON 数组扁平化。
start_point
指定解析器应解析的 JSON 数据中的键。解析器仅提取与 start_point 键关联的值内的数据。它会解析数据内 start_point 键的所有实例的值。
start_point_occurrence
整数值,指示由 start_point 参数指定的键的出现位置,解析器应从该位置开始解析。例如,如果将此值设置为 4,则解析器将仅从 start_point 键第五次出现时开始加载数据。仅在同时提供 start_point 参数时才有效。
omit_empty_keys
如果设置为 TRUE,则忽略加载数据中不含值集的所有键。
reject_on_duplicate
如果设置为 TRUE,则拒绝含有重复键名称的行。键名不区分大小写,因此键“mykey”和“MyKey”将被视为重复。
reject_on_materialized_type_error
如果设置为 TRUE,则在数据含有与现有实体化列匹配的键,但其中一个键无法映射到实体化列的数据类型时,拒绝该行。
reject_on_empty_key
如果设置为 TRUE,则拒绝含有键但不含值的所有行。
key_separator
用作键值之间的分隔符的单个字符,而不是默认句点 (.) 字符。
suppress_nonalphanumeric_key_chars
如果设置为 TRUE,则将 JSON 键值中的所有非字母数字字符替换为下划线 (_) 字符。

有关详细信息,请参阅JSON 数据

以下示例演示了如何从 Kafka 加载 JSON 数据。该语句中的参数根据加载情况定义了以下事项:

  • 将数据加载到预先存在且名为 logs 的表中。

  • KafkaSource 会从名为 server_log 的源中的单个分区流式传输数据。

  • 用于加载数据的 Kafka 代理将通过端口 9092 在名为 kafka01 的主机上运行。

  • KafkaSource 在 10 秒后或到达流的结尾时停止加载数据,以先发生者为准。

  • KafkJSONParser 会将 JSON 数据中的任何数组或映射扁平化。

=> COPY logs SOURCE KafkaSource(stream='server_log|0|0',
                                stop_on_eof=true,
                                duration=interval '10 seconds',
                                brokers='kafka01:9092')
   PARSER KafkaJSONParser(flatten_arrays=True, flatten_maps=True);

10.5 - KafkaListManyTopics

从 Kafka 代理检索有关所有主题的信息。此函数列出 Kafka 群集中定义的所有主题,以及它包含的分区数量和为这些主题提供服务的代理。

语法

KafkaListManyTopics('broker:port[;...]'
                    [USING PARAMETERS
                        [kafka_conf='kafka_configuration_setting'
                        [, timeout=timeout_sec]])
broker
Kafka 群集中代理的主机名(或 IP 地址)
port
运行代理的端口号。
kafka_conf

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

timeout

等待 Kafka 群集做出响应的整数秒数。

示例

=> \x
Expanded display is on.
=> SELECT KafkaListManyTopics('kafka01.example.com:9092')
   OVER (PARTITION AUTO);
-[ RECORD 1 ]--+--------------------------------------------------
brokers        | kafka01.example.com:9092,kafka02.example.com:9092
topic          | __consumer_offsets
num_partitions | 50
-[ RECORD 2 ]--+--------------------------------------------------
brokers        | kafka01.example.com:9092,kafka02.example.com:9092
topic          | iot_data
num_partitions | 1
-[ RECORD 3 ]--+--------------------------------------------------
brokers        | kafka01.example.com:9092,kafka02.example.com:9092
topic          | test
num_partitions | 1

10.6 - KafkaListTopics

从 Kafka 代理获取可用主题列表。

语法

KafkaListTopics(USING PARAMETERS brokers='hostname:port[,hostname2:port2...]'
                                 [, kafka_conf='kafka_configuration_setting']
                                 [, timeout=timeout_sec])
brokers
要查询主题列表的代理的主机名和端口号。您可以使用逗号分隔的列表提供多个代理。但是,返回的列表将仅包含列表中最后一个代理服务的主题。
kafka_conf

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

timeout

等待 Kafka 群集做出响应的整数秒数。

示例

=> SELECT KafkaListTopics(USING PARAMETERS brokers='kafka1-01.example.com:9092')
          OVER ();
         topic         | num_partitions
-----------------------+----------------
 test                  |              1
 iot_data              |              1
 __consumer_offsets    |             50
 vertica_notifications |              1
 web_hits              |              1
(5 rows)

10.7 - KafkaOffsets

KafkaOffsets 用户定义的转换函数返回最近通过调用 KafkaSource 生成的加载操作统计信息。查询 KafkaOffsets 可查看最近加载操作生成的元数据。可以在每次调用 KafkaSource 后查询 KafkaOffsets,以查看有关该加载的信息。如果使用的是调度程序,还可以在 stream_microbatch_history 表中查看历史加载信息。

对于每个加载操作,KafkaOffsets 返回以下内容:

  • 源 kafka 主题

  • 源 kafka 分区

  • 起始偏移量

  • 结束偏移量

  • 加载的消息数

  • 读取的字节数

  • 加载操作的持续时间

  • 结束消息

  • 结束原因

以下示例演示了如何调用 KafkaOffsets 以显示使用 KafkaSource 加载的名为 web_test 的表的分区信息。

=> SELECT kpartition, start_offset, end_offset, msg_count, ending FROM (select KafkaOffsets() over()
   FROM web_test) AS stats ORDER BY kpartition;
 kpartition | start_offset | end_offset | msg_count |   ending
------------+--------------+------------+-----------+------------
          0 |           -2 |       9999 |      1068 | END_OFFSET

输出显示,KafkaSource 从 Kafka 加载了单个分区中的 1068 条消息(行)。KafkaSource 因为到达了结束偏移量而结束了数据加载。

10.8 - KafkaParser

KafkaParser 不解析从 Kafka 加载的数据。相反,它以 LONG VARCHAR 值的形式传递消息。当您希望将原始 Kafka 消息加载到 Vertica 进行进一步处理时,请使用此解析器。您可以将此解析器用作不支持的格式均可使用的通用解析器。

KafkaParser 不接受任何参数。

示例

以下示例将原始消息从名为 iot-data 的 Kafka 主题加载到一个名为 raw_iot 的表中。

=> CREATE TABLE raw_iot(message LONG VARCHAR);
CREATE TABLE
=> COPY raw_iot SOURCE KafkaSource(stream='iot-data|0|-2,iot-data|1|-2,iot-data|2|-2',
                                   brokers='docd01:6667,docd03:6667', stop_on_eof=TRUE)
                PARSER KafkaParser();
 Rows Loaded
-------------
        5000
(1 row)

=> select * from raw_iot limit 10;
              message
------------------------------------
 10039, 59, -68.951406, -19.270126
 10042, 40, -82.688712, 4.7187705
 10054, 6, -153.805268, -10.5173935
 10054, 71, -135.613150, 58.286458
 10081, 44, 130.288419, -77.344405
 10104, -5, 77.882598, -56.600744
 10132, 87, 103.530616, -69.672863
 10135, 6, -121.420382, 15.3229855
 10166, 77, -179.592211, 42.0477075
 10183, 62, 17.225394, -55.6644765
(10 rows)

10.9 - KafkaSource

KafkaSource UDL 可访问 Kafka 群集中的数据。所有 Kafka 解析器都必须使用 KafkaSource。KafkaSource 处理的消息长度必须至少为一个字节。对于零长度消息,KafkaSource 会将错误消息写入 vertica.log。

KafkaSource 的输出不能直接使用 Vertica 中的任何非 Kafka 解析器(如 FCSVPARSER)。KafkaParser 会生成其他有关流的元数据,解析器需要使用这些元数据才能正确解析数据。您必须使用 KafkaInsertDelimiters 等筛选器将数据转换为其他解析器可以处理的格式。有关更多示例,请参阅解析自定义格式

您可以使用 CLOSE_ALL_SESSIONS 等关闭会话函数来取消正在运行的 KafkaSource 数据加载。

语法

KafkaSource(stream='topic_name|partition|start_offset[|end_offset]'[, param=value [,...] ] )
stream
必需。将要加载的数据定义为一个或多个分区的逗号分隔列表。每个分区由三个必需值和一个可选值构成,用管道字符 (|) 进行分隔:
  • topic_name:要从其加载数据的 Kafka 主题的名称。您可以从同一流参数中的不同 Kafka 主题进行读取,但存在一些限制。有关详细信息,请参阅从同一流参数中的多个主题进行加载

  • partition:要复制的 Kafka 主题中的分区。

  • start_offset:Kafka 主题中加载开始处的偏移量。此偏移量包含在内(偏移量为 start_offset 的消息也会加载)。有关其他选项,请参阅下面的特殊起始偏移量值

  • end_offset:加载应结束处的可选偏移量。此偏移量不包含在内(将不加载偏移量为 end_offset 的消息)。
    要使用 end_offset 结束加载,必须为 stream 参数中的所有分区提供结束偏移量值。如果尝试为某些分区设置结束偏移量而不为其他分区设置偏移量值,则会导致出错。
    如果不指定结束偏移量,则必须使用 stop_on_eofduration 提供至少一个其他结束条件。

brokers
Kafka 群集中代理的 host:port 对的逗号分隔列表。Vertica 建议在与 Vertica 不同的计算机上运行 Kafka。

默认值: localhost:9092

duration
指定时间范围持续时间的间隔。在此指定时间过后,KafkaSource 会终止 COPY 语句。如果未设置此参数,则必须使用 stop_on_eof 来设置至少一个其他结束条件,或者改为指定结束偏移量。有关详细信息,请参阅下面的持续时间说明
executionparallelism
加载数据时要使用的线程数。通常,将此参数设置为介于 1 和节点要从其加载的分区数之间的整数值。将此参数设置为更小的值可限制用于处理任何 COPY 语句的线程数。此外,还会增加池中发出的短查询的吞吐量,特别是在并发执行多个查询时更是如此。

如果未指定此参数,Vertica 会自动为每个分区创建一个线程,直至到达资源池允许的限值。

如果为 KafkaSource 指定的值小于为调度程序资源池指定的值,则使用 KafkaSource 值。此值不能超过为调度程序的资源池指定的值。

stop_on_eof
确定 KafkaSource 在 COPY 语句到达文件结尾后是否终止该语句。如果未设置此值,则必须使用 duration 或通过提供结束偏移量来设置至少一个其他结束条件。

默认值: FALSE

group_id

Kafka 使用者组(Vertica 将向其报告消费消息进度)的名称。设置此值以禁用向 Kafka 使用者组报告进度。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况

默认值: vertica_database-name

kafka_conf

采用 JSON 格式直接传递给 rdkafka 库的选项/值对的对象。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项

fail_on_conf_parse_error

确定 kafka_conf 包含格式不正确的选项和值或者无效的配置属性时,该函数是否失败。

默认值:FALSE

有关接受的选项和值格式,请参阅直接设置 Kafka 库选项

有关有效配置属性的列表,请参阅 rdkafka GitHub 存储库

stream 参数的 start_offset 部分允许您从主题分区中的特定点开始加载消息。它还接受两个特殊偏移值之一:

  • -2 告知Set Snippet Variable Value in Topic从主题分区中最早可用的消息开始加载。当您需要从 Kafka 主题的分区中尽量加载更多消息时,此值十分有用。

  • -3 告知Set Snippet Variable Value in Topic从使用者组保存的偏移量开始加载。如果使用者组没有保存的偏移量,它会从主题分区中最早可用的消息开始加载。有关详细信息,请参阅通过使用者组监控 Vertica 消息使用情况

从同一流参数中的多个主题进行加载

只要遵循以下准则,便可从同一流参数中的多个 Kafka 主题进行加载:

  • 主题的数据必须采用相同的格式,因为要将数据从 KafkaSource 传递到同一解析器。例如,不能从一个采用 Avro 格式的主题和另一个采用 JSON 格式的主题加载数据。

  • 同样,如果要加载 Avro 数据并从注册表中指定外部架构,则需要小心谨慎。Avro 解析器在每次加载数据时接受一个架构。如果不同主题中的数据具有不同的架构,则其中一个主题中的所有数据都将被拒绝。

  • 不同主题中的数据应具有相同(或非常类似)的架构,尤其是在将数据加载到传统 Vertica 表中时更是如此。虽然可以将具有不同架构的数据加载到 Flex 表中,但只有在少数情况下,将不同的数据合并到一个表中才有意义。

持续时间说明

duration 参数适用于 Vertica 允许 KafkaSource 函数运行的时间长度。它通常反映整个加载语句所花费的时间。但是,如果 KafkaSource 要加载大量数据或数据需要大量处理和解析,则查询的整体运行时间可能会超过在 duration 中指定的时间量。

示例

以下示例演示了如何调用 KafkaSource 以使用以下选项将数据从 Kafka 加载到名为 web_table 的现有 Flex 表中:

  • 名为 web_hits 的流只有一个分区。

  • 加载将从流中的最早消息开始(通过传递 -2 作为起始偏移量来标识)。

  • 加载在到达偏移量为 1000 的消息时结束。

  • Kafka 群集的代理是 example.com 域中的 kafka01 和 kafka03。

  • 这些代理正在侦听端口 9092。

  • 如果在到达偏移量为 1000 的消息之前到达流的结尾,加载将结束。如果不提供此选项,连接器将等到 Kafka 发送偏移量为 1000 的消息为止。

  • 已加载的数据将发送到 KafkaJSONParser 进行处理。

=> COPY web_table
   SOURCE KafkaSource(stream='web_hits|0|-2|1000',
                      brokers='kafka01.example.com:9092,kafka03.example.com:9092',
                      stop_on_eof=true)
   PARSER KafkaJSONParser();
 Rows Loaded
-------------
        1000
(1 row)

要查看有关此加载操作的详细信息,请查询 KafkaOffsets。KafkaOffsets 会返回 Vertica 在最近一次调用 KafkaSource 期间从 Kafka 使用的消息的相关元数据:

=> SELECT KafkaOffsets() OVER();
  ktopic  | kpartition | start_offset | end_offset | msg_count | bytes_read |    duration     |   ending   |      end_msg
----------+------------+--------------+------------+-----------+------------+-----------------+------------+-------------------
 web_hits |          0 |            0 |        999 |      1000 |     197027 | 00:00:00.385365 | END_OFFSET | Last message read
(1 row)

msg_count 列用于验证 Vertica 是否已加载 1000 条消息,ending 列指示 Vertica 在到达偏移量为 1000 的消息时已停止使用消息。

10.10 - KafkaTopicDetails

从一个或多个 Kafka 代理检索有关指定主题的信息。此函数将列出有关主题分区的详细信息,以及为 Kafka 群集中的每个分区提供服务的 Kafka 代理。

语法

KafkaTopicDetails(USING PARAMETERS brokers='hostname:port[,hostname2:port2...]'
                                  , topic=topic_name
                                 [, kafka_conf='option=value[;option2=value2...]']
                                 [, timeout=timeout_sec])
brokers
Kafka 群集中代理的主机名(或 IP 地址)。
port
运行代理的端口号。
topic
您需要获取其详细信息的 Kafka 主题。
kafka_conf
要直接传递给 rdkafka 库的 option=value 对的分号分隔列表。这是 Vertica 用于与 Kafka 进行通信的库。您可以使用此参数直接设置通过 Vertica 与 Kafka 的集成所无法提供的配置选项。有关详细信息,请参阅直接设置 Kafka 库选项
timeout
等待 Kafka 群集做出响应的整数秒数。

示例

=> SELECT KafkaTopicDetails(USING PARAMETERS brokers='kafka1-01.example.com:9092',topic='iot_data') OVER();
 partition_id | lead_broker | replica_brokers | in_sync_replica_brokers
--------------+-------------+-----------------+-------------------------
            0 |           0 | 0               | 0
            1 |           1 | 1               | 1
            2 |           0 | 0               | 0
            3 |           1 | 1               | 1
            4 |           0 | 0               | 0
(5 rows)

11 - 数据流式传输架构表

每次创建调度程序 (--create) 时,Vertica 都会使用您指定的名称或默认的 stream_config 为该调度程序创建一个架构。每个架构都具有如下表:

11.1 - stream_clusters

此表列出群集和主机。使用 vkconfig 群集工具更改此表中的设置。有关详细信息,请参阅群集工具选项

示例

以下示例显示了一个群集及其关联主机。

=> SELECT * FROM stream_config.stream_clusters;

    id    |    cluster     |               hosts
 ---------+----------------+-----------------------------------
  2250001 | streamcluster1 | 10.10.10.10:9092,10.10.10.11:9092
(1 rows)

11.2 - stream_events

此表将来自调度程序的微批处理和其他重要事件记录到内部日志表中。

此表是从 kafka_config.kafka_events 重命名的。

示例

以下示例显示了 stream_events 表中的典型行。

=> SELECT * FROM stream_config.stream_events;
-[ RECORD 1 ]-+-------------
event_time    | 2016-07-17 13:28:35.548-04
log_level     | INFO
frame_start   |
frame_end     |
microbatch    |
message       | New leader registered for schema stream_config. New ID: 0, new Host: 10.20.30.40
exception     |
-[ RECORD 2 ]-+-------------
event_time    | 2016-07-17 13:28:45.643-04
log_level     | INFO
frame_start   | 2015-07-17 12:28:45.633
frame_end     | 2015-07-17 13:28:50.701-04
microbatch    |
message       | Generated tuples: test3|2|-2,test3|1|-2,test3|0|-2
exception     |
-[ RECORD 3 ]-+----------------
event_time    | 2016-07-17 14:28:50.701-04
log_level     | INFO
frame_start   | 2016-07-17 13:28:45.633
frame_end     | 2016-07-17 14:28:50.701-04
microbatch    |
message       | Total rows inserted: 0
exception     |

11.3 - stream_load_specs

此表描述了用户创建的加载规范。您可以使用 vkconfig 实用程序的加载规范工具来更改此表中的条目。

示例

此示例显示了可用于 Vertica 实例的加载规范。


SELECT * FROM stream_config.stream_load_specs;
-[ RECORD 1 ]-----+------------
id                | 1
load_spec         | loadspec2
filters           |
parser            | KafkaParser
parser_parameters |
load_method       | direct
message_max_bytes | 1048576
uds_kv_parameters |
-[ RECORD 2 ]-----+------------
id                | 750001
load_spec         | streamspec1
filters           |
parser            | KafkaParser
parser_parameters |
load_method       | TRICKLE
message_max_bytes | 1048576
uds_kv_parameters |

11.4 - stream_lock

此表由调度程序锁定。锁定可防止多个调度程序同时运行。锁定此表的调度程序使用自己的信息对它进行更新。

示例

=> SELECT * FROM weblog_sched.stream_lock;
 scheduler_id |       update_time       | process_info
--------------+-------------------------+--------------
            2 | 2018-11-08 10:12:36.033 |
(1 row)

11.5 - stream_microbatch_history

此表包含在此调度程序配置中执行的每个微批处理的历史记录。

示例

以下示例显示了 stream_microbatch_history 表中的典型行。


=> SELECT * FROM stream_config.stream_microbatch_history;

-[ RECORD 1 ]--+---------------------------
source_name             | streamsource1
source_cluster          | kafka-1
source_partition        | 0
start_offset            | 196
end_offset              | 196
end_reason              | END_OF_STREAM
partition_bytes         | 0
partition_messages      | 0
microbatch_id           | 1
microbatch              | mb_0
target_schema           | public
target_table            | kafka_flex_0
timeslice               | 00:00:09.892
batch_start             | 2016-07-28 11:31:25.854221
batch_end               | 2016-07-28 11:31:26.357942
last_batch_duration     | 00:00:00.379826
last_batch_parallelism  | 1
microbatch_sub_id       | 0
consecutive_error_count |
transaction_id          | 45035996275130064
frame_start             | 2016-07-28 11:31:25.751
frame_end               |
end_reason_message      |

-[ RECORD 2 ]--+---------------------------
source_name             | streamsource1
source_cluster          | kafka-1
source_partition        | 1
start_offset            | 197
end_offset              | 197
end_reason              | NETWORK_ISSUE
partition_bytes         | 0
partition_messages      | 0
microbatch_id           | 1
microbatch              | mb_0
target_schema           | public
target_table            | kafka_flex_0
timeslice               | 00:00:09.897
batch_start             | 2016-07-28 11:31:45.84898
batch_end               | 2016-07-28 11:31:46.253367
last_batch_duration     | 000:00:00.377796
last_batch_parallelism  | 1
microbatch_sub_id       | 0
consecutive_error_count |
transaction_id          | 45035996275130109
frame_start             | 2016-07-28 11:31:45.751
frame_end               |
end_reason_message      | Local: All brokers are down

11.6 - stream_microbatch_source_map

此表将微批处理映射到它们的关联源。

示例

以下示例显示了 stream_microbatch 表中的典型行。

SELECT * FROM stream_config.stream_microbatch_source_map;
microbatch | source
-----------+--------
         1 |      4
         3 |      2
(2 rows)

11.7 - stream_microbatches

此表包含与微批处理相关的配置数据。

示例

以下示例显示了典型 stream_microbatches 表中的一行。

=> select * from weblog_sched.stream_microbatches;
-[ RECORD 1 ]-----+----------
id                | 750001
microbatch        | weberrors
target            | 750001
load_spec         | 2250001
target_columns    |
rejection_schema  |
rejection_table   |
max_parallelism   | 1
enabled           | t
consumer_group_id |
-[ RECORD 2 ]-----+----------
id                | 1
microbatch        | weblog
target            | 1
load_spec         | 1
target_columns    |
rejection_schema  |
rejection_table   |
max_parallelism   | 1
enabled           | t
consumer_group_id | weblog_group

11.8 - stream_scheduler

此表包含与单个调度程序相关的元数据。

此表是从 kafka_config.kafka_scheduler 重命名的。此表曾包含一个名为 eof_timeout_ms 的列。现在已将它移除。

示例

以下示例显示了 stream_scheduler 表中的典型行。

=> SELECT * FROM weblog_sched.stream_scheduler;
-[ RECORD 1 ]------+-----------------------
version            | v9.2.1
frame_duration     | 00:05:00
resource_pool      | weblog_pool
config_refresh     | 00:05
new_source_policy  | FAIR
pushback_policy    | LINEAR
pushback_max_count | 5
auto_sync          | t
consumer_group_id  | vertica-consumer-group

11.9 - stream_scheduler_history

此表显示已启动的调度程序实例的历史记录。

此表是从 kafka_config.kafka_scheduler_history 重命名的。

示例

以下示例显示了 stream_scheduler_history 表中的典型行。

 SELECT * FROM stream_config.stream_scheduler_history;
   elected_leader_time   |     host     |     launcher      | scheduler_id | version
-------------------------+--------------+-------------------+--------------+---------
 2016-07-26 13:19:42.692 | 10.20.100.62 |                   |            0 | v8.0.0
 2016-07-26 13:54:37.715 | 10.20.100.62 |                   |            1 | v8.0.0
 2016-07-26 13:56:06.785 | 10.20.100.62 |                   |            2 | v8.0.0
 2016-07-26 13:56:56.033 | 10.20.100.62 | SchedulerInstance |            3 | v8.0.0
 2016-07-26 15:51:20.513 | 10.20.100.62 | SchedulerInstance |            4 | v8.0.0
 2016-07-26 15:51:35.111 | 10.20.100.62 | SchedulerInstance |            5 | v8.0.0
    (6 rows)

11.10 - stream_sources

此表包含与数据流式传输源相关的元数据。

此表以前名为 kafka_config.kafka_scheduler。

示例

以下示例显示了 stream_sources 表中的典型行。

select * from  stream_config.stream_sources;
-[ RECORD 1 ]--------------
   id         | 1
   source     | SourceFeed1
   cluster    | 1
   partitions | 1
   enabled    | t
-[ RECORD 2 ]--------------
   id         | 250001
   source     | SourceFeed2
   cluster    | 1
   partitions | 1
   enabled    | t

11.11 - stream_targets

此表包含所有 Vertica 目标表的元数据。

此表以前名为 kafka_config.kafka_targets。

示例

以下示例显示了 stream_tables 表中的典型行。

=> SELECT * FROM stream_config.stream_targets;
-[ RECORD 1 ]-----+---------------------
id                | 1
target_schema     | public
target_table      | stream_flex1
-[ RECORD 2 ]-----+---------------------
id                | 2
target_schema     | public
target_table      | stream_flex2