CREATE DATA LOADER FOR KAFKA

CREATE DATA LOADER FOR KAFKA creates an automatic data loader for kafka that executes a COPY statement when new data files appear in a specified location. The loader records which files have already been successfully loaded and skips them.

Create a data loader with kafka.

You can execute a data loader in the following ways:

Syntax

CREATE [ OR REPLACE ] DATA LOADER [ IF NOT EXISTS ]
   [schema.]name
   [ RETENTION INTERVAL monitoring-retention ]
   AS copy-statement with kafka source

Example

   CREATE DATA LOADER test_loader AS COPY myschema.mytable SOURCE KafkaSource\
   (stream='mytopic'; brokers='mykafka.example.com:9092';\
   stop_on_eof=true;\
   kafka_conf='sasl.username=admin;sasl.password=admin;\
   sasl.mechanism=SCRAM-SHA-512;security.protocol=SASL_SSL')\
   PARSER\
   KafkaAvroParser(schema_registry_url='https://mykafkaschema.example.com');
 

Arguments

OR REPLACE
If a data loader with the same name in the same schema exists, replace it. The original data loader's monitoring table is dropped.

This option cannot be used with IF NOT EXISTS.

IF NOT EXISTS

If an object with the same name exists, return without creating the object. If you do not use this directive and the object already exists, Vertica returns with an error message.

The IF NOT EXISTS clause is useful for SQL scripts where you might not know if the object already exists. The ON ERROR STOP directive can be helpful in scripts.

This option cannot be used with OR REPLACE.

schema
Schema containing the data loader. The default schema is public.
name
Name of the data loader.
RETENTION INTERVAL monitoring-retention
How long to keep records in the events table. DATA_LOADER_EVENTS records events for all data loaders, but each data loader has its own retention interval.

Default: 14 days

AS copy-statement
The COPY statement that the loader executes. The FROM clause typically uses a glob.

Privileges

Non-superuser: CREATE privileges on the schema.

Restrictions

  • COPY NO COMMIT is not supported.
  • Data loaders are executed with COPY ABORT ON ERROR.
  • The COPY statement must specify file paths. You cannot use COPY FROM VERTICA.

Scheduling Data Loader for Kafka

  1. Create the data loader for Kafka source.
  2. Create a stored procedure which executes the data loader.
  3. Create a schedule object with the required execution time interval.
  4. Create a trigger with the scheduled and stored procedure.

Example

a. CREATE DATA LOADER mykafkaloader AS\
     COPY myschema.mytable SOURCE KafkaSource(\
     stream='mytopic'; \
     brokers='mykafka.example.com:9092'; \
     stop_on_eof=true; \
     kafka_conf='sasl.username=admin;sasl.password=admin;sasl.mechanism=SCRAM-SHA-512;security.protocol=SASL_SSL')\
     PARSER\
     KafkaAvroParser(schema_registry_url='https://mykafkaschema.example.com');\
 b. CREATE SCHEDULE mykafkaschedule USING CRON '*/5 * * * *';
 c. CREATE PROCEDURE mykafkaloaderexec\ 
    AS $$\
    BEGIN\
      EXECUTE 'EXECUTE DATA LOADER mykafkaloader';\
    END;\
    $$;\
 d. CREATE TRIGGER mykafkaloadertrigger ON SCHEDULE mykafkaschedule\
    EXECUTE PROCEDURE mykafkaloaderexec() AS DEFINER;