KafkaOffsets

KafkaOffsets 用户定义的转换函数返回最近通过调用 KafkaSource 生成的加载操作统计信息。查询 KafkaOffsets 可查看最近加载操作生成的元数据。可以在每次调用 KafkaSource 后查询 KafkaOffsets,以查看有关该加载的信息。如果使用的是调度程序,还可以在 stream_microbatch_history 表中查看历史加载信息。

对于每个加载操作,KafkaOffsets 返回以下内容:

  • 源 kafka 主题

  • 源 kafka 分区

  • 起始偏移量

  • 结束偏移量

  • 加载的消息数

  • 读取的字节数

  • 加载操作的持续时间

  • 结束消息

  • 结束原因

以下示例演示了如何调用 KafkaOffsets 以显示使用 KafkaSource 加载的名为 web_test 的表的分区信息。

=> SELECT kpartition, start_offset, end_offset, msg_count, ending FROM (select KafkaOffsets() over()
   FROM web_test) AS stats ORDER BY kpartition;
 kpartition | start_offset | end_offset | msg_count |   ending
------------+--------------+------------+-----------+------------
          0 |           -2 |       9999 |      1068 | END_OFFSET

输出显示,KafkaSource 从 Kafka 加载了单个分区中的 1068 条消息(行)。KafkaSource 因为到达了结束偏移量而结束了数据加载。