KafkaAvroParser

The KafkaAvroParser parses Avro-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.

The KafkaAvroParser parses Avro-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.

Syntax

KafkaAvroParser(param=value[,...])
enforce_length
When set to TRUE, rejects the row if any value is too wide to fit into its column. When using the default setting (FALSE) , the parser truncates any value that is too wide to fit within the column's maximum width.
enable_chunker
If set to TRUE, can improve parsing performance, especially when parsing large/complex Kafka messages.
reject_on_materialized_type_error
When set to TRUE, rejects the row if it contains a materialized column value that cannot be mapped into the materialized column's data type.
flatten_maps
If set to TRUE, flattens all Avro maps.
flatten_arrays
If set to TRUE, flattens Avro arrays.
flatten_records
If set to TRUE, flattens all Avro records.
external_schema
The schema of the Avro file as a JSON string. If this parameter is not specified, the parser assumes that each message has the schema on it. If you are using a schema registry, do not use this parameter.
codec
The codec in which the Avro file was written. Valid values are:
  • default: Data is not compressed and codec is not needed

  • deflate: Data is compressed using the deflate codec

  • snappy: Snappy compression

with_metadata
If set to TRUE, messages include Avro datum, schema, and object metadata. By default, the KafkaAvroParser parses messages without including schema and metadata. If you enable this parameter, write your messages using the Avro API and confirm they contain only Avro datum. The default value is FALSE.
schema_registry_url
Required, the URL of the Confluent schema registry. This parameter is required to load data based on a schema registry version. If you are using an external schema, do not use this parameter. For more information, refer to Avro Schema Registry.
schema_registry_ssl_ca_path
Required for TLS connections, the path on the Vertica node's file system to a directory containing one or more hashed certificate authority (CA) certificates that signed the schema registry's server certificate. Each Vertica node must store hashed CA certificates on the same path.

For details on hashed CA certificates, see Hashed CA Certificates.

schema_registry_ssl_cert_path
Path on the Vertica node's file system to a client certificate issued by a certificate authority (CA) that the schema registry trusts.
schema_registry_ssl_key_path
Path on the Vertica server file system to the private key for the client certificate defined with schema_registry_ssl_cert_path.
schema_registry_ssl_key_password_path
Path on the Vertica server file system to the optional password for the private key defined with schema_registry_ssl_key_path.
schema_registry_subject
In the schema registry, the subject of the schema to use for data loading.
schema_registry_version
In the schema registry, the version of the schema to use for data loading.
key_separator
Sets the character to use as the separator between keys.

Data types

KafkaAvroParser supports the same data types as the favroparser. For details, see Avro data.

Example

The following example demonstrates loading data from Kafka in an Avro format. The statement:

  • Loads data into an existing flex table named weather_logs.

  • Copies data from the default Kafka broker (running on the local system on port 9092).

  • The source is named temperature.

  • The source has a single partition.

  • The load starts from offset 0.

  • The load ends either after 10 seconds or the load reaches the end of the source, whichever occurs first.

  • The KafkaAvroParser does not flatten any arrays, maps, or records it finds in the source.

  • The schema for the data is provided in the statement as a JSON string. It defines a record type named Weather that contains fields for a station name, a time, and a temperature.

  • Rejected rows of data are saved to a table named t_rejects1.

=> COPY weather_logs
   SOURCE KafkaSource(stream='temperature|0|0', brokers=’kafka01:9092, kafka01:9093’, stop_on_eof=true,
                      duration=interval '10 seconds')
   PARSER KafkaAvroParser(flatten_arrays=False, flatten_maps=False, flatten_records=False,
                          external_schema=E'{"type":"record","name":"Weather","fields":'
                                           '[{"name":"station","type":"string"},'
                                           '{"name":"time","type":"long"},'
                                           '{"name":"temp","type":"int"}]}')
   REJECTED DATA AS TABLE "t_rejects1";

Hashed CA certificates

Some parameters like schema_registry_ssl_ca_path require hashed CA certificates rather than the CA certificates themselves. A hashed CA certificate is a symbolic link to the original CA certificate. This symbolic link must have the following naming scheme:

CA_hash.0

For example, if the hash for ca_cert.pem is 9741086f, the hashed CA certificate would be 9741086f.0, a symbolic link to ca_cert.pem.

For details, see the OpenSSL 1.1 or 1.0 documentation.

Hashing CA certificates

The procedure for hashing CA certificates varies between versions of openssl. You can find your version of openssl with:

$ openssl version

For openssl 1.1 or higher, use openssl rehash. For example, if the directory /my_ca_certs/ contains ca_cert.pem, you can hash and symbolically link to it with:

$ openssl rehash /my_ca_certs/

This adds the hashed CA certificate to the directory:

$ ls -l
total 8
lrwxrwxrwx 1 ver ver    8 Mar 13 14:41 9da13359.0 -> ca_cert.pem
-rw-r--r-- 1 ver ver 1245 Mar 13 14:41 ca_cert.pem

For openssl 1.0, you can use openssl x509 -hash -noout -in ca_cert.pem to retrieve the hash and then create a symbolic link to the CA certificate. For example:

  1. Run the following command to retrieve the hash of the CA certificate ca_cert.pem:

    $ openssl x509 -hash -noout -in /my_ca_certs/ca_cert.pem
    9741086f
    
  2. Create a symbolic link to /my_ca_certs/ca_cert.pem:

    $ ln /my_ca_certs/ca_cert.pem /my_ca_certs/9741086f.0
    

This adds the hashed CA certificate to the directory:

$ ls -l
total 8
-rw-r--r-- 2 ver ver 1220 Mar 13 13:41 9741086f.0 -> ca_cert.pem
-rw-r--r-- 2 ver ver 1220 Mar 13 13:41 ca_cert.pem