KafkaOffsets
KafkaOffsets 用户定义的转换函数返回最近通过调用 KafkaSource 生成的加载操作统计信息。查询 KafkaOffsets 可查看最近加载操作生成的元数据。可以在每次调用 KafkaSource 后查询 KafkaOffsets,以查看有关该加载的信息。如果使用的是调度程序,还可以在 stream_microbatch_history 表中查看历史加载信息。
对于每个加载操作,KafkaOffsets 返回以下内容:
-
源 kafka 主题
-
源 kafka 分区
-
起始偏移量
-
结束偏移量
-
加载的消息数
-
读取的字节数
-
加载操作的持续时间
-
结束消息
-
结束原因
以下示例演示了如何调用 KafkaOffsets 以显示使用 KafkaSource 加载的名为 web_test 的表的分区信息。
=> SELECT kpartition, start_offset, end_offset, msg_count, ending FROM (select KafkaOffsets() over()
FROM web_test) AS stats ORDER BY kpartition;
kpartition | start_offset | end_offset | msg_count | ending
------------+--------------+------------+-----------+------------
0 | -2 | 9999 | 1068 | END_OFFSET
输出显示,KafkaSource 从 Kafka 加载了单个分区中的 1068 条消息(行)。KafkaSource 因为到达了结束偏移量而结束了数据加载。
注意
start_offset 列中显示的值不包含在内(加载具有所示偏移量的消息),end_offset 列中的值包含在内(加载具有所示偏移量的消息)。这与 KafkaSource 的stream
参数中指定的值相反。KafkaSource 和 KafkaOffset 的起始和结束偏移量的包含性之间的区别基于作业调度程序的需求。KafkaOffset 主要供作业调度程序使用,因此定义起始和结束偏移量值是为了让调度程序可以轻松地从中断处开始流式传输。