KafkaAvroParser

KafkaAvroParser 会解析 Avro 格式的 Kafka 消息并将其加载到常规 Vertica 表或 Vertica Flex 表中。

语法

KafkaAvroParser(param=value[,...])
enforce_length
设置为 TRUE 时,如果有值太宽而无法容纳到其所在的列中,则拒绝该行。使用默认设置 (FALSE) 时,解析器会截断任何太宽而无法容纳到列最大宽度内的值。
reject_on_materialized_type_error
设置为 TRUE 时,如果行中含有实体化列值,但该列值无法映射到实体化列的数据类型,则拒绝该行。
flatten_maps
如果设置为 TRUE,则将所有 Avro 映射扁平化。
flatten_arrays
如果设置为 TRUE,则将 Avro 数组扁平化。
flatten_records
如果设置为 TRUE,则将所有 Avro 记录扁平化。
external_schema
将 Avro 文件的架构指定为 JSON 字符串。如果未指定此参数,解析器假定每个消息都具有架构。如果使用的是架构注册表,请勿使用此参数。
codec
指定编写 Avro 文件所用的编解码器。有效值包括:
  • 'default' - 不压缩数据,因此不需要编解码器

  • 'deflate' - 使用 Deflate 编解码器压缩数据

  • 'snappy' - Snappy 压缩

with_metadata
如果设置为 TRUE,消息包括 Avro 基准、架构和对象元数据。默认情况下,KafkaAvroParser 会解析消息,但不包括架构和元数据。如果启用此参数,请使用 Avro API 编写消息,并确认它们仅包含 Avro 基准。默认值为 FALSE。
schema_registry_url
必需。指定 Confluent 架构注册表的 URL。需要使用此参数根据架构注册表版本加载数据。如果使用的是外部架构,请勿使用此参数。有关详细信息,请参考 在 Kafka 中使用架构注册表。
schema_registry_ssl_ca_path
进行 TLS 连接时需要提供。Vertica 节点文件系统上指向已对架构注册表服务器证书签名的证书颁发机构 (CA) 的路径。

每个 Vertica 节点都必须将 CA 存储在相同的文件系统路径中。

schema_registry_ssl_cert_path
Vertica 节点文件系统上指向由架构注册表信任的证书颁发机构 (CA) 所颁发的客户端证书的路径。
schema_registry_ssl_key_path
Vertica 服务器文件系统上指向使用 schema_registry_ssl_cert_path 定义的客户端证书的私钥的路径。
schema_registry_ssl_key_password_path
Vertica 服务器文件系统上指向使用 schema_registry_ssl_key_path 定义的私钥的可选密码的路径。
schema_registry_subject
在架构注册表中,表示用于加载数据的架构的主题。
schema_registry_version
在架构注册表中,表示用于加载数据的架构的版本。
key_separator
设置用作键之间的分隔符的字符。

数据类型

KafkaAvroParser 支持与 favroparser 相同的数据类型。有关详细信息,请参阅Avro 数据

示例

以下示例演示了如何以 Avro 格式从 Kafka 加载数据。该语句:

  • 会将数据加载到名为 weather_logs 的现有 Flex 表中。

  • 会从默认 Kafka 代理(通过端口 9092 在本地系统上运行)复制数据。

  • 源被命名为 temperature。

  • 源具有一个分区。

  • 加载从偏移量 0 处开始。

  • 加载在 10 秒后或在加载到达源结尾后结束,以先发生者为准。

  • KafkaAvroParser 不会将它在源中找到的任何数组、映射或记录扁平化。

  • 数据的架构将在语句中以 JSON 字符串的形式提供。该架构定义了名为 Weather 的记录类型,其中包含表示工作站名称、时间和温度的字段。

  • 被拒绝的数据行将保存到名为 t_rejects1 的表中。

=> COPY weather_logs
   SOURCE KafkaSource(stream='temperature|0|0', stop_on_eof=true,
                      duration=interval '10 seconds')
   PARSER KafkaAvroParser(flatten_arrays=False, flatten_maps=False, flatten_records=False,
                          external_schema=E'{"type":"record","name":"Weather","fields":'
                                           '[{"name":"station","type":"string"},'
                                           '{"name":"time","type":"long"},'
                                           '{"name":"temp","type":"int"}]}')
   REJECTED DATA AS TABLE "t_rejects1";