使用偏移量限制加载

Kafka 会维护用户可配置的消息积压。默认情况下,新创建的调度程序读取 Kafka 主题中的所有消息,包括积压的所有消息,而不仅仅是调度程序启动后流出的消息。通常,您想要的就是这样。

然而,在某些情况下,您可能只想将源的一部分流传输到表中。例如,假定您想要分析您的电子商务站点在特定日期和时间的 Web 流量。然而,您的 Kafka 主题包含比您想要分析的时间更早的 Web 访问记录。在这种情况下,可以使用偏移量,将需要的数据流传输到 Vertica 进行分析。

另一个常见的用例是当您已经从 Kafka 手动加载了一些数据时(请参阅手动使用来自 Kafka 的数据)。现在,您希望流传输新到达的所有数据。默认情况下,调度程序会重新加载之前加载的所有数据(假定仍然可以从 Kafka 获得)。此时,可以使用偏移量来告诉调度程序从手动数据加载停止的位置开始自动加载数据。

配置调度程序从某个偏移量开始流传输

vkconfig 脚本的微批处理工具有一个 --offset 选项,使用它可以指定希望调度程序开始加载的源中消息的索引。此选项接受逗号分隔的索引值列表。除非使用 --partition 选项,否则必须为源中的每个分区提供一个索引值。此选项可用于选择应用偏移量的分区。在微批处理中设置偏移量时,调度程序不能运行。

如果微批处理定义了多个群集,请使用 --cluster 选项选择偏移量选项适用于哪个群集。类似地,如果微批处理有多个源,则必须使用 --source 选项选择一个。

例如,假定您只想从一个名为 web_hits 的源加载最后 1000 条消息。为了方便起见,假定源只包含单个分区,而微批处理只定义了单个群集和单个主题。

第一个任务是确定流结尾的当前偏移量。可以在一个 Kafka 节点上通过调用 GetOffsetShell 类执行此操作,将时间参数设置为 -1(主题的结尾):

$ path to kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell \
                                          --broker-list kafka01:9092,kafka03:9092 --time -1 \
                                          --topic web_hits

{metadata.broker.list=kafka01:9092,kafka03:9092, request.timeout.ms=1000,
 client.id=GetOffsetShell, security.protocol=PLAINTEXT}
web_hits:0:8932

还可以使用 GetOffsetShell 查找流中出现在时间戳之前的偏移量。

在上面的示例中,web_hits 主题的单个分区的结尾偏移量为 8932。如果我们希望加载来自源的最后 1000 条消息,我们需要将微批处理的偏移量设置为 8932 - 1001 或 7931。

计算偏移量后,即可在微批处理配置中设置了。下面的例子:

  • 关闭调度程序,其配置信息存储在 weblog.conf 文件中。

  • 使用微批处理实用程序设置起始偏移量。

  • 重新启动调度程序。

$ vkconfig shutdown --conf weblog.conf
$ vkconfig microbatch --microbatch weblog --update --conf weblog.conf --offset 7931
$ nohup vkconfig launch --conf weblog.conf >/dev/null 2>&1 &

如果目标表为空,或者在调度程序启动之前被截断,那么表中将有 1000 行(直到通过源流传输更多的消息):

=> select count(*) from web_hits;
 count
-------
  1000
(1 row)