KafkaAvroParser
The KafkaAvroParser parses Avro-formatted Kafka messages and loads them into a regular Vertica table or a Vertica flex table.
Syntax
KafkaAvroParser(param=value[,...])
enforce_length
- When set to TRUE, rejects the row if any value is too wide to fit into its column. When using the default setting (FALSE) , the parser truncates any value that is too wide to fit within the column's maximum width.
enable_chunker
- If set to TRUE, can improve parsing performance, especially when parsing large/complex Kafka messages.
reject_on_materialized_type_error
- When set to TRUE, rejects the row if it contains a materialized column value that cannot be mapped into the materialized column's data type.
flatten_maps
- If set to TRUE, flattens all Avro maps.
flatten_arrays
- If set to TRUE, flattens Avro arrays.
flatten_records
- If set to TRUE, flattens all Avro records.
external_schema
- The schema of the Avro file as a JSON string. If this parameter is not specified, the parser assumes that each message has the schema on it. If you are using a schema registry, do not use this parameter.
codec
- The codec in which the Avro file was written. Valid values are:
-
default
: Data is not compressed and codec is not needed -
deflate
: Data is compressed using the deflate codec -
snappy
: Snappy compression
Note
This option is mainly provided for backwards compatibility. You usually have Kafka compress data at the message level, and have KafkaSource decompress the message for you. -
with_metadata
- If set to TRUE, messages include Avro datum, schema, and object metadata. By default, the KafkaAvroParser parses messages without including schema and metadata. If you enable this parameter, write your messages using the Avro API and confirm they contain only Avro datum. The default value is FALSE.
schema_registry_url
- Required, the URL of the Confluent schema registry. This parameter is required to load data based on a schema registry version. If you are using an external schema, do not use this parameter. For more information, refer to Avro Schema Registry.
Note
TLS connections must use the HTTPS protocol. schema_registry_ssl_ca_path
- Required for TLS connections, the path on the Vertica node's file system to a directory containing one or more hashed certificate authority (CA) certificates that signed the schema registry's server certificate. Each Vertica node must store hashed CA certificates on the same path.
-
Important
In some circumstances, you might receive a validation error stating that the KafkaAvroParser cannot locate your CA certificate. To correct this error, store your CA certificate in your operating system's default bundle. For example, store the CA certificate in/etc/pki/tls/certs/ca-bundle.crt
on Red Hat operating systems.For details on hashed CA certificates, see Hashed CA Certificates.
schema_registry_ssl_cert_path
- Path on the Vertica node's file system to a client certificate issued by a certificate authority (CA) that the schema registry trusts.
schema_registry_ssl_key_path
- Path on the Vertica server file system to the private key for the client certificate defined with
schema_registry_ssl_cert_path
. schema_registry_ssl_key_password_path
- Path on the Vertica server file system to the optional password for the private key defined with
schema_registry_ssl_key_path
. schema_registry_subject
- In the schema registry, the subject of the schema to use for data loading.
schema_registry_version
- In the schema registry, the version of the schema to use for data loading.
key_separator
- Sets the character to use as the separator between keys.
Data types
KafkaAvroParser supports the same data types as the favroparser. For details, see Avro data.
Example
The following example demonstrates loading data from Kafka in an Avro format. The statement:
-
Loads data into an existing flex table named weather_logs.
-
Copies data from the default Kafka broker (running on the local system on port 9092).
-
The source is named temperature.
-
The source has a single partition.
-
The load starts from offset 0.
-
The load ends either after 10 seconds or the load reaches the end of the source, whichever occurs first.
-
The KafkaAvroParser does not flatten any arrays, maps, or records it finds in the source.
-
The schema for the data is provided in the statement as a JSON string. It defines a record type named Weather that contains fields for a station name, a time, and a temperature.
-
Rejected rows of data are saved to a table named t_rejects1.
=> COPY weather_logs
SOURCE KafkaSource(stream='temperature|0|0', brokers=’kafka01:9092, kafka01:9093’, stop_on_eof=true,
duration=interval '10 seconds')
PARSER KafkaAvroParser(flatten_arrays=False, flatten_maps=False, flatten_records=False,
external_schema=E'{"type":"record","name":"Weather","fields":'
'[{"name":"station","type":"string"},'
'{"name":"time","type":"long"},'
'{"name":"temp","type":"int"}]}')
REJECTED DATA AS TABLE "t_rejects1";
Hashed CA certificates
Some parameters like schema_registry_ssl_ca_path
require hashed CA certificates rather than the CA certificates themselves. A hashed CA certificate is a symbolic link to the original CA certificate. This symbolic link must have the following naming scheme:
CA_hash.0
For example, if the hash for ca_cert.pem
is 9741086f
, the hashed CA certificate would be 9741086f.0
, a symbolic link to ca_cert.pem
.
For details, see the OpenSSL 1.1 or 1.0 documentation.
Hashing CA certificates
The procedure for hashing CA certificates varies between versions of openssl
. You can find your version of openssl
with:
$ openssl version
For openssl
1.1 or higher, use openssl rehash
. For example, if the directory /my_ca_certs/
contains ca_cert.pem
, you can hash and symbolically link to it with:
$ openssl rehash /my_ca_certs/
This adds the hashed CA certificate to the directory:
$ ls -l
total 8
lrwxrwxrwx 1 ver ver 8 Mar 13 14:41 9da13359.0 -> ca_cert.pem
-rw-r--r-- 1 ver ver 1245 Mar 13 14:41 ca_cert.pem
For openssl
1.0, you can use openssl x509 -hash -noout -in
ca_cert
.pem
to retrieve the hash and then create a symbolic link to the CA certificate. For example:
-
Run the following command to retrieve the hash of the CA certificate
ca_cert.pem
:$ openssl x509 -hash -noout -in /my_ca_certs/ca_cert.pem 9741086f
-
Create a symbolic link to
/my_ca_certs/ca_cert.pem
:$ ln /my_ca_certs/ca_cert.pem /my_ca_certs/9741086f.0
This adds the hashed CA certificate to the directory:
$ ls -l
total 8
-rw-r--r-- 2 ver ver 1220 Mar 13 13:41 9741086f.0 -> ca_cert.pem
-rw-r--r-- 2 ver ver 1220 Mar 13 13:41 ca_cert.pem