KafkaJSONParser

KafkaJSONParser 会解析 JSON 格式的 Kafka 消息并将其加载到常规 Vertica 表或 Vertica Flex 表中。

语法

KafkaJSONParser(
        [enforce_length=Boolean]
        [, flatten_maps=Boolean]
        [, flatten_arrays=Boolean]
        [, start_point=string]
        [, start_point_occurrence=integer]
        [, omit_empty_keys=Boolean]
        [, reject_on_duplicate=Boolean]
        [, reject_on_materialized_type_error=Boolean]
        [, reject_on_empty_key=Boolean]
        [, key_separator=char]
        [, suppress_nonalphanumeric_key_chars=Boolean]
        )
enforce_length
设置为 TRUE 时,如果加载的数据太宽而无法容纳到其所在的列中,则拒绝该行。默认设置为 FALSE,以截断任何太宽而无法容纳到其所在列的数据。
flatten_maps
如果设置为 TRUE,则将所有 JSON 映射扁平化。
flatten_arrays
如果设置为 TRUE,则将 JSON 数组扁平化。
start_point
指定解析器应解析的 JSON 数据中的键。解析器仅提取与 start_point 键关联的值内的数据。它会解析数据内 start_point 键的所有实例的值。
start_point_occurrence
整数值,指示由 start_point 参数指定的键的出现位置,解析器应从该位置开始解析。例如,如果将此值设置为 4,则解析器将仅从 start_point 键第五次出现时开始加载数据。仅在同时提供 start_point 参数时才有效。
omit_empty_keys
如果设置为 TRUE,则忽略加载数据中不含值集的所有键。
reject_on_duplicate
如果设置为 TRUE,则拒绝含有重复键名称的行。键名不区分大小写,因此键“mykey”和“MyKey”将被视为重复。
reject_on_materialized_type_error
如果设置为 TRUE,则在数据含有与现有实体化列匹配的键,但其中一个键无法映射到实体化列的数据类型时,拒绝该行。
reject_on_empty_key
如果设置为 TRUE,则拒绝含有键但不含值的所有行。
key_separator
用作键值之间的分隔符的单个字符,而不是默认句点 (.) 字符。
suppress_nonalphanumeric_key_chars
如果设置为 TRUE,则将 JSON 键值中的所有非字母数字字符替换为下划线 (_) 字符。

有关详细信息,请参阅JSON 数据

以下示例演示了如何从 Kafka 加载 JSON 数据。该语句中的参数根据加载情况定义了以下事项:

  • 将数据加载到预先存在且名为 logs 的表中。

  • KafkaSource 会从名为 server_log 的源中的单个分区流式传输数据。

  • 用于加载数据的 Kafka 代理将通过端口 9092 在名为 kafka01 的主机上运行。

  • KafkaSource 在 10 秒后或到达流的结尾时停止加载数据,以先发生者为准。

  • KafkJSONParser 会将 JSON 数据中的任何数组或映射扁平化。

=> COPY logs SOURCE KafkaSource(stream='server_log|0|0',
                                stop_on_eof=true,
                                duration=interval '10 seconds',
                                brokers='kafka01:9092')
   PARSER KafkaJSONParser(flatten_arrays=True, flatten_maps=True);