解析自定义格式
要处理 Kafka 数据流,解析器必须识别每条消息之间的边界。Vertica 提供的 Kafka 解析器可以识别 Avro、JSON 和原始数据格式的边界,但您的数据流可能会使用自定义格式。为解析自定义格式,Vertica 提供了筛选器,可在数据流到达解析器之前在数据流中插入边界信息。
Kafka 筛选器
Vertica 提供以下筛选器:
-
KafkaInsertDelimiters:在数据流中的每条消息之间插入用户指定的分隔符。该分隔符可以包含任意字符,具有任意长度。此解析器使用以下语法:
KafkaInsertDelimiters(delimiter = 'delimiter')
-
KafkaInsertLengths:在消息的开头插入消息长度(以字节为单位)。Vertica 按 big-endian 网络字节顺序将长度写入为 4 字节 uint32 值。例如,100 字节的消息会加上 0x00000064 前缀。此解析器使用以下语法:
KafkaInsertLengths()
注意
由于每个 Kafka 筛选器都需要来自流源的输入并输出非流源,因此无法在同一 COPY 语句中使用两个 Kafka 筛选器来处理 Kafka 数据流。除了使用其中一个 Kafka 筛选器之外,还可以在单个 COPY 语句中添加一个或多个用户定义的筛选器。请以逗号分隔的列表指定多个筛选器,并先列出 Vertica 筛选器。如果使用非 Kafka 解析器,则必须至少使用一个筛选器为解析器准备数据流,否则解析器将失败并返回错误。
示例
以下 COPY 语句将从名为 iot-data
的主题中的两个分区加载逗号分隔值。处理完两个分区中的所有消息后,加载将退出。KafkaInsertDelimiters 筛选器将在 Kafka 消息之间插入换行符,以将其转换为传统的数据行。该语句将使用标准 COPY 解析器以逗号分隔 CSV 值:
=> COPY kafka_iot SOURCE KafkaSource(stream='iot-data|0|-2,iot-data|1|-2',
brokers='kafka01:9092',
stop_on_eof=True)
FILTER KafkaInsertDelimiters(delimiter = E'\n')
DELIMITER ',';
Rows Loaded
-------------
3430
(1 row)