手动使用来自 Kafka 的数据

可以使用 COPY 语句将流式传输数据从 Kafka 手动加载到 Vertica 中,就像可以从一个文件或其他源加载一组有限数据一样。与标准数据源不同,Kafka 数据以消息流的形式连续到达,您必须先对这些消息进行解析才能将它们加载到 Vertica 中。在 COPY 语句中使用 Kafka 函数准备数据流。

此示例以增量式构建一个 COPY 语句,它从名为 web_hits 的 Kafka 主题手动加载 JSON 编码数据。web_hits 主题流式传输网站请求的服务器日志。

有关将数据加载到 Vertica 中的信息,请参阅数据加载

创建目标表

要确定目标表架构,必须确定消息结构。下面是 web_hits 流的一个示例:

{"url": "list.jsp", "ip": "144.177.38.106", "date": "2017/05/02 20:56:00",
"user-agent": "Mozilla/5.0 (compatible; MSIE 6.0; Windows NT 6.0; Trident/5.1)"}
{"url": "search/wp-content.html", "ip": "215.141.172.28", "date": "2017/05/02 20:56:01",
"user-agent": "Opera/9.53.(Windows NT 5.2; sl-SI) Presto/2.9.161 Version/10.00"}

此主题流式传输 JSON 编码数据。由于 JSON 数据不一致,并且可能包含不可预测的增加值,因此请将该数据流存储在一个 Flex 表中。Flex 表动态接受数据中出现的其他字段。

以下语句创建一个名为 web_table 的 Flex 表来存储该数据流:

=> CREATE FLEX TABLE web_table();

在 COPY 语句的开头,添加 web_table 作为目标表:

COPY web_table

有关 Flex 表的详细信息,请参阅 Flex 表

定义 KafkaSource

COPY 语句的源始终是 KafkaSource。KafkaSource 接受有关数据流、Kafka 代理和其他处理选项的详细信息以连续加载数据,直到满足结束条件

流详细信息

stream 参数定义您希望从一个或多个主题分区中加载的数据段。每个 Kafka 主题都将其消息拆分为不同的分区以获得可扩展的吞吐量。Kafka 根据 Kafka 管理员设置的规则为每个主题保留一个消息待完成项。可以选择加载待完成项中的部分或全部消息,或者仅加载当前流式传输的消息。

对于每个分区,stream 参数要求以竖线 (|) 分隔的列表形式提供主题名称、主题分区和分区偏移量。(可选)可以提供一个结束偏移量作为停止从数据流加载的结束条件:

'stream='topic_name|partition|start_offset[|end_offset]'

要从 web_hits 主题的单个分区加载整个待完成项,请使用 SOURCE 关键字附加具有以下 stream 参数值的 KafkaSource:

COPY ...
SOURCE KafkaSource(stream='web_hits|0|-2', ...

在前面的示例中:

  • web_hits 是要从中加载数据的主题的名称。

  • 0 是要从中加载数据的主题分区。主题分区从 0 开始建立索引,web_hits 仅包含一个分区。

  • -2 加载整个待完成项。这是一个特殊的偏移量值,它通知 KafkaSource 以最早可用的消息偏移量开始加载。

加载多个分区

此示例虽然仅从一个分区加载,但是请务必了解如何在一个 COPY 语句中从多个分区加载。

要从同一主题中的其他分区(甚至是其他主题)加载,请提供逗号分隔的列表,其中包括主题名称、分区号和由竖线分隔的偏移量值。例如,以下 stream 实参从 web_hits 主题的分区 0 到分区 2 加载整个待完成项:

KafkaSource(stream='web_hits|0|-2,web_hits|1|-2,web_hits|2|-2'...

当在同一 COPY 语句中加载多个分区时,可以将 executionparallelism 参数设置为定义针对 COPY 语句创建的线程数。理想情况下,希望每个分区使用一个线程。可以选择不指定值,让 Vertica 根据资源池中的分区数和可用资源确定线程数。在此例中,只有一个分区,因此不需要额外的线程来加载数据。

添加 Kafka 代理

KafkaSource 需要 Kafka 群集中代理的主机名(或 IP 地址)和端口号。Kafka 代理是 Vertica 为了检索 Kafka 数据而访问的服务。在以下示例中,Kafka 群集有一个名为 kafka01.example.com 的代理,该代理在端口 9092 上运行。请将 brokers 参数和值附加到 COPY 语句:

COPY ...
SOURCE KafkaSource(stream='web_hits|0|-2',
                   brokers='kafka01.example.com:9092', ...

选择结束条件

因为数据从 Kafka 连续到达,所以从 Kafka 手动加载需要定义一个结束条件来指示何时停止加载数据。除了流详细信息中描述的结束偏移量,还可以选择:

  • 在设置的一段时间内复制尽可能多的数据。

  • 一直加载数据,直到在超时时间内没有新数据到达。

  • 加载所有可用数据,而不等待任何进一步的数据到达。

以下示例运行 10000 毫秒(10 秒)的 COPY 以获取数据样本。如果 COPY 语句能够在 10 秒内加载整个数据待完成项,那么它将剩余的时间用于在流式传输数据到达时进行加载。此值在 duration 参数中设置。附加 duration 值以完成 KafkaSource 定义:

COPY ...
SOURCE KafkaSource(stream='web_hits|0|-2',
                    brokers='kafka01.example.com:9092',
                    duration=interval '10000 milliseconds')

如果从 Kafka 启动具有较长持续时间的 COPY 语句,并且需要停止它,可以调用一个函数(例如 CLOSE_ALL_SESSIONS)来关闭其会话。

选择解析器

Kafka 不强制其数据流上的消息格式。消息通常采用 Avro 或 JSON 格式,但也可以采用任何格式。COPY 语句通常使用三个 Kafka 特定解析器之一:

因为 Kafka 解析器可以识别流式传输数据中的记录边界,所以其他解析器(如 Flex 解析器)不与 KafkaSource 的输出直接兼容。必须使用筛选器更改 KafkaSource 输出,其他解析器才能处理数据。有关详细信息,请参阅解析自定义格式

在以下示例中,web_hits 中的数据采用 JSON 格式编码,因此它使用 KafkaJSONParser。此值在 COPY 语句的 PARSER 子句中进行设置:

COPY ...
SOURCE ...
PARSER KafkaJSONParser()

存储拒绝的数据

Vertica 将解析器无法解析的原始 Kafka 消息连同有关拒绝原因的信息保存到拒绝表中。该表由 COPY 语句创建。以下示例将拒绝内容保存到名为 web_hits_rejections 的表中。此值在 COPY 语句的 REJECTED DATA AS TABLE 子句中设置:

COPY ...
SOURCE ...
PARSER ...
REJECTED DATA AS TABLE public.web_hits_rejections;

将数据流加载到 Vertica 中

以下步骤使用前面几个部分中以增量方式构建的 COPY 语句加载 web_hits 主题中的 JSON 数据 10 秒:

  1. 执行 COPY 语句:

    => COPY web_table
       SOURCE KafkaSource(stream='web_hits|0|-2',
                          brokers='kafka01.example.com:9092',
                          duration=interval '10000 milliseconds')
       PARSER KafkaJSONParser()
       REJECTED DATA AS TABLE public.web_hits_rejections;
     Rows Loaded
    -------------
             800
    (1 row)
    
  2. 计算 Flex 表键:

    => SELECT compute_flextable_keys('web_table');
                  compute_flextable_keys
    --------------------------------------------------
     Please see public.web_table_keys for updated keys
    (1 row)
    

    有关更多详细信息,请参阅计算 Flex 表键

  3. 查询 web_table_keys 以返回这些键:

    => SELECT * FROM web_table_keys;
      key_name  | frequency | data_type_guess
    ------------+-----------+-----------------
     date       |       800 | Timestamp
     user_agent |       800 | Varchar(294)
     ip         |       800 | Varchar(30)
     url        |       800 | Varchar(88)
    (4 rows)
    
  4. 查询 web_table 以返回从 web_hits Kafka 主题加载的数据:

    => SELECT date, url, ip FROM web_table LIMIT 10;
            date         |                url                 |       ip
    ---------------------+------------------------------------+-----------------
     2021-10-15 02:33:31 | search/index.htm                   | 192.168.210.61
     2021-10-17 16:58:27 | wp-content/about.html              | 10.59.149.131
     2021-10-05 09:10:06 | wp-content/posts/category/faq.html | 172.19.122.146
     2021-10-01 08:05:39 | blog/wp-content/home.jsp           | 192.168.136.207
     2021-10-10 07:28:39 | main/main.jsp                      | 172.18.192.9
     2021-10-22 12:41:33 | tags/categories/about.html         | 10.120.75.17
     2021-10-17 09:41:09 | explore/posts/main/faq.jsp         | 10.128.39.196
     2021-10-13 06:45:36 | category/list/home.jsp             | 192.168.90.200
     2021-10-27 11:03:50 | category/posts/posts/index.php     | 10.124.166.226
     2021-10-26 01:35:12 | categories/search/category.htm     | 192.168.76.40
    (10 rows)