Manually consume data from Kafka

You can manually load streaming data from Kafka into Vertica using a COPY statement, just as you can load a finite set of data from a file or other source.

You can manually load streaming data from Kafka into Vertica using a COPY statement, just as you can load a finite set of data from a file or other source. Unlike a standard data source, Kafka data arrives continuously as a stream of messages that you must parse before loading into Vertica. Use Kafka functions in the COPY statement to prepare the data stream.

This example incrementally builds a COPY statement that manually loads JSON-encoded data from a Kafka topic named web_hits. The web_hits topic streams server logs of web site requests.

For information about loading data into Vertica, see Data load.

Creating the target table

To determine the target table schema, you must identify the message structure. The following is a sample of the web_hits stream:

{"url": "list.jsp", "ip": "144.177.38.106", "date": "2017/05/02 20:56:00",
"user-agent": "Mozilla/5.0 (compatible; MSIE 6.0; Windows NT 6.0; Trident/5.1)"}
{"url": "search/wp-content.html", "ip": "215.141.172.28", "date": "2017/05/02 20:56:01",
"user-agent": "Opera/9.53.(Windows NT 5.2; sl-SI) Presto/2.9.161 Version/10.00"}

This topic streams JSON-encoded data. Because JSON data is inconsistent and might contain unpredictable added values, store this data stream in a flex table. Flex tables dynamically accept additional fields that appear in the data.

The following statement creates a flex table named web_table to store the data stream:

=> CREATE FLEX TABLE web_table();

To begin the COPY statement, add the web_table as the target table:

COPY web_table

For more information about flex tables, see Flex tables.

Defining KafkaSource

The source of your COPY statement is always KafkaSource. KafkaSource accepts details about the data stream, Kafka brokers, and additional processing options to continuously load data until an end condition is met.

Stream details

The stream parameter defines the data segment that you want to load from one or more topic partitions. Each Kafka topic splits its messages into different partitions to get scalable throughput. Kafka keeps a backlog of messages for each topic according to rules set by the Kafka administrator. You can choose to load some or all of the messages in the backlog, or just load the currently streamed messages.

For each partition, the stream parameter requires the topic name, topic partition, and the partition offset as a list delimited by a pipe character (|). Optionally, you can provide and end offset as an end condition to stop loading from the data stream:

'stream='topic_name|partition|start_offset[|end_offset]'

To load the entire backlog from a single partition of the web_hits topic, use the SOURCE keyword to append KafkaSource with the following stream parameter values:

COPY ...
SOURCE KafkaSource(stream='web_hits|0|-2', ...

In the previous example:

  • web_hits is the name of the topic to load data from.

  • 0 is the topic partition to load data from. Topic partitions are 0-indexed, and web_hits contains only one partition.

  • -2 loads the entire backlog. This is a special offset value that tells KafkaSource to start loading at the earliest available message offset.

Loading multiple partitions

This example loads from only one partition, but it is important to understand how to load from multiple partitions in a single COPY statement.

To load from additional partitions in the same topic, or even additional topics, supply a comma-separated list of topic name, partition number, and offset values delimited by pipe characters. For example, the following stream argument loads the entire message backlog from partitions 0 through 2 of the web_hits topic:

KafkaSource(stream='web_hits|0|-2,web_hits|1|-2,web_hits|2|-2'...

When you load multiple partitions in the same COPY statement, you can set the executionparallelism parameter to define the number of threads created for the COPY statement. Ideally, you want to use one thread per partition. You can choose to not specify a value and let Vertica determine the number of threads based on the number of partitions and the resources available in the resource pool. In this example, there is only one partition, so there's no need for additional threads to load data.

Adding the Kafka brokers

KafkaSource requires the host names (or IP addresses) and port numbers of the brokers in your Kafka cluster. The Kafka brokers are the service Vertica accesses in order to retrieve the Kafka data. In this example, the Kafka cluster has one broker named kafka01.example.com, running on port 9092. Append the brokers parameter and value to the COPY statement:

COPY ...
SOURCE KafkaSource(stream='web_hits|0|-2',
                   brokers='kafka01.example.com:9092', ...

Choosing the end condition

Because data continuously arrives from Kafka, manual loads from Kafka require that you define an end condition that indicates when to stop loading data. In addition to the end offset described in Stream Details, you can choose to:

  • Copy as much data as possible for a set duration of time.

  • Load data until no new data arrives within a timeout period.

  • Load all available data, and not wait for any further data to arrive.

This example runs COPY for 10000 milliseconds (10 seconds) to get a sample of the data. If the COPY statement is able to load the entire backlog of data in under 10 seconds, it spends the remaining time loading streaming data as it arrives. This values is set in the duration parameter. Append the duration value to complete the KafkaSource definition:

COPY ...
SOURCE KafkaSource(stream='web_hits|0|-2',
                    brokers='kafka01.example.com:9092',
                    duration=interval '10000 milliseconds')

If you start a long-duration COPY statement from Kafka and need to stop it, you can call one of the functions that closes its session, such as CLOSE_ALL_SESSIONS.

Selecting a parser

Kafka does not enforce message formatting on its data streams. Messages are often in Avro or JSON format, but they could be in any format. Your COPY statement usually uses one of three Kafka-specific parsers:

Because the Kafka parsers can recognize record boundaries in streaming data, the other parsers (such as the Flex parsers) are not directly compatible with the output of KafkaSource. You must alter the KafkaSource output using filters before other parsers can process the data. See Parsing custom formats for more information.

In this example, the data in the web_hits is encoded in JSON format, so it uses the KafkaJSONParser. This value is set in the COPY statement's PARSER clause:

COPY ...
SOURCE ...
PARSER KafkaJSONParser()

Storing rejected data

Vertica saves raw Kafka messages that the parser cannot parse to a rejects table, along with information on why it was rejected. This table is created by the COPY statement. This example saves rejects to the table named web_hits_rejections. This value is set in the COPY statement's REJECTED DATA AS TABLE clause:

COPY ...
SOURCE ...
PARSER ...
REJECTED DATA AS TABLE public.web_hits_rejections;

Loading the data stream into Vertica

The following steps load JSON data from the web_hits topic for 10 seconds using the COPY statement that was incrementally built in the previous sections:

  1. Execute the COPY statement:

    => COPY web_table
       SOURCE KafkaSource(stream='web_hits|0|-2',
                          brokers='kafka01.example.com:9092',
                          duration=interval '10000 milliseconds')
       PARSER KafkaJSONParser()
       REJECTED DATA AS TABLE public.web_hits_rejections;
     Rows Loaded
    -------------
             800
    (1 row)
    
  2. Compute the flex table keys:

    => SELECT compute_flextable_keys('web_table');
                  compute_flextable_keys
    --------------------------------------------------
     Please see public.web_table_keys for updated keys
    (1 row)
    

    For additional details, see Computing flex table keys.

  3. Query web_table_keys to return the keys:

    => SELECT * FROM web_table_keys;
      key_name  | frequency | data_type_guess
    ------------+-----------+-----------------
     date       |       800 | Timestamp
     user_agent |       800 | Varchar(294)
     ip         |       800 | Varchar(30)
     url        |       800 | Varchar(88)
    (4 rows)
    
  4. Query web_table to return the data loaded from the web_hits Kafka topic:

    => SELECT date, url, ip FROM web_table LIMIT 10;
            date         |                url                 |       ip
    ---------------------+------------------------------------+-----------------
     2021-10-15 02:33:31 | search/index.htm                   | 192.168.210.61
     2021-10-17 16:58:27 | wp-content/about.html              | 10.59.149.131
     2021-10-05 09:10:06 | wp-content/posts/category/faq.html | 172.19.122.146
     2021-10-01 08:05:39 | blog/wp-content/home.jsp           | 192.168.136.207
     2021-10-10 07:28:39 | main/main.jsp                      | 172.18.192.9
     2021-10-22 12:41:33 | tags/categories/about.html         | 10.120.75.17
     2021-10-17 09:41:09 | explore/posts/main/faq.jsp         | 10.128.39.196
     2021-10-13 06:45:36 | category/list/home.jsp             | 192.168.90.200
     2021-10-27 11:03:50 | category/posts/posts/index.php     | 10.124.166.226
     2021-10-26 01:35:12 | categories/search/category.htm     | 192.168.76.40
    (10 rows)