解析自定义格式

要处理 Kafka 数据流,解析器必须识别每条消息之间的边界。Vertica 提供的 Kafka 解析器可以识别 AvroJSON原始数据格式的边界,但您的数据流可能会使用自定义格式。为解析自定义格式,Vertica 提供了筛选器,可在数据流到达解析器之前在数据流中插入边界信息。

Kafka 筛选器

Vertica 提供以下筛选器:

  • KafkaInsertDelimiters:在数据流中的每条消息之间插入用户指定的分隔符。该分隔符可以包含任意字符,具有任意长度。此解析器使用以下语法:

    KafkaInsertDelimiters(delimiter = 'delimiter')
  • KafkaInsertLengths:在消息的开头插入消息长度(以字节为单位)。Vertica 按 big-endian 网络字节顺序将长度写入为 4 字节 uint32 值。例如,100 字节的消息会加上 0x00000064 前缀。此解析器使用以下语法:

    KafkaInsertLengths()

除了使用其中一个 Kafka 筛选器之外,还可以在单个 COPY 语句中添加一个或多个用户定义的筛选器。请以逗号分隔的列表指定多个筛选器,并先列出 Vertica 筛选器。如果使用非 Kafka 解析器,则必须至少使用一个筛选器为解析器准备数据流,否则解析器将失败并返回错误。

示例

以下 COPY 语句将从名为 iot-data 的主题中的两个分区加载逗号分隔值。处理完两个分区中的所有消息后,加载将退出。KafkaInsertDelimiters 筛选器将在 Kafka 消息之间插入换行符,以将其转换为传统的数据行。该语句将使用标准 COPY 解析器以逗号分隔 CSV 值:

=> COPY kafka_iot SOURCE KafkaSource(stream='iot-data|0|-2,iot-data|1|-2',
                                     brokers='kafka01:9092',
                                     stop_on_eof=True)
                  FILTER KafkaInsertDelimiters(delimiter = E'\n')
                  DELIMITER ',';
 Rows Loaded
-------------
        3430
(1 row)