Parsing custom formats
To process a Kafka data stream, the parser must identify the boundary between each message. Vertica provides Kafka parsers that can identify boundaries for Avro, JSON, and raw data formats, but your data stream might use a custom format. To parse custom formats, Vertica provides filters that insert boundary information in the data stream before it reaches the parser.
Kafka filters
Vertica provides the following filters:
-
KafkaInsertDelimiters: Inserts a user-specified delimiter between each message in the data stream. The delimiter can contain any characters and be of any length. This parser uses the following syntax:
KafkaInsertDelimiters(delimiter = '
delimiter
')
-
KafkaInsertLengths: Inserts the message length in bytes at the beginning of the message. Vertica writes the length as a 4-byte uint32 value in big-endian network byte order. For example, a 100-byte message is preceded by 0x00000064. This parser uses the following syntax:
KafkaInsertLengths()
Note
Because each Kafka filter requires input from a stream source and outputs a non-stream source, you cannot use both Kafka filters in the same COPY statement to process a Kafka data stream.In addition to one of the Kafka filters, you can include one or more user-defined filters in a single COPY statement. Specify multiple filters as a comma-separated list, and list the Vertica filter first. If you use a non-Kafka parser, you must use at least one filter to prepare the data stream for the parser, or the parser fails and returns an error.
Examples
The following COPY statement loads comma-separated values from two partitions in a topic named iot-data
. The load exits after it processes all messages in both partitions. The KafkaInsertDelimiters filter inserts newlines between the Kafka messages to convert them into traditional rows of data. The statement uses the standard COPY parser to delimit CSV values with a comma:
=> COPY kafka_iot SOURCE KafkaSource(stream='iot-data|0|-2,iot-data|1|-2',
brokers='kafka01:9092',
stop_on_eof=True)
FILTER KafkaInsertDelimiters(delimiter = E'\n')
DELIMITER ',';
Rows Loaded
-------------
3430
(1 row)