CREATE DATA LOADER FOR KAFKA
CREATE DATA LOADER FOR KAFKA creates an automatic data loader for kafka that executes a COPY statement when new data files appear in a specified location. The loader records which files have already been successfully loaded and skips them.
Create a data loader with kafka.
You can execute a data loader in the following ways:
-
On demand: Use EXECUTE DATA LOADER to execute the data loader once.
-
On a schedule: Use a scheduled stored procedure.
Syntax
CREATE [ OR REPLACE ] DATA LOADER [ IF NOT EXISTS ]
[schema.]name
[ RETENTION INTERVAL monitoring-retention ]
AS copy-statement with kafka source
Example
CREATE DATA LOADER test_loader AS COPY myschema.mytable SOURCE KafkaSource\
(stream='mytopic'; brokers='mykafka.example.com:9092';\
stop_on_eof=true;\
kafka_conf='sasl.username=admin;sasl.password=admin;\
sasl.mechanism=SCRAM-SHA-512;security.protocol=SASL_SSL')\
PARSER\
KafkaAvroParser(schema_registry_url='https://mykafkaschema.example.com');
Arguments
OR REPLACE
- If a data loader with the same name in the same schema exists, replace it. The original data loader's monitoring table is dropped.
This option cannot be used with
IF NOT EXISTS
. IF NOT EXISTS
If an object with the same name exists, return without creating the object. If you do not use this directive and the object already exists, Vertica returns with an error message.
The
IF NOT EXISTS
clause is useful for SQL scripts where you might not know if the object already exists. The ON ERROR STOP directive can be helpful in scripts.This option cannot be used with
OR REPLACE
.schema
- Schema containing the data loader. The default schema is
public
. name
- Name of the data loader.
RETENTION INTERVAL
monitoring-retention
- How long to keep records in the events table. DATA_LOADER_EVENTS records events for all data loaders, but each data loader has its own retention interval.
Default:
14 days
AS
copy-statement
- The COPY statement that the loader executes. The FROM clause typically uses a glob.
Privileges
Non-superuser: CREATE privileges on the schema.
Restrictions
- COPY NO COMMIT is not supported.
- Data loaders are executed with COPY ABORT ON ERROR.
- The COPY statement must specify file paths. You cannot use COPY FROM VERTICA.
Scheduling Data Loader for Kafka
- Create the data loader for Kafka source.
- Create a stored procedure which executes the data loader.
- Create a schedule object with the required execution time interval.
- Create a trigger with the scheduled and stored procedure.
Note
Execute Data Loader loads data from the latest partitions available in the topic.Example
a. CREATE DATA LOADER mykafkaloader AS\
COPY myschema.mytable SOURCE KafkaSource(\
stream='mytopic'; \
brokers='mykafka.example.com:9092'; \
stop_on_eof=true; \
kafka_conf='sasl.username=admin;sasl.password=admin;sasl.mechanism=SCRAM-SHA-512;security.protocol=SASL_SSL')\
PARSER\
KafkaAvroParser(schema_registry_url='https://mykafkaschema.example.com');\
b. CREATE SCHEDULE mykafkaschedule USING CRON '*/5 * * * *';
c. CREATE PROCEDURE mykafkaloaderexec\
AS $$\
BEGIN\
EXECUTE 'EXECUTE DATA LOADER mykafkaloader';\
END;\
$$;\
d. CREATE TRIGGER mykafkaloadertrigger ON SCHEDULE mykafkaschedule\
EXECUTE PROCEDURE mykafkaloaderexec() AS DEFINER;