UDFilter class

The UDFilter class is responsible for reading raw input data from a source and preparing it to be loaded into Vertica or processed by a parser.

The UDFilter class is responsible for reading raw input data from a source and preparing it to be loaded into Vertica or processed by a parser. This preparation may involve decompression, re-encoding, or any other sort of binary manipulation.

A UDFilter is instantiated by a corresponding FilterFactory on each host in the Vertica cluster that is performing filtering for the data source.

UDFilter methods

Your UDFilter subclass must override process() or processWithMetadata():

  • process() reads the raw input stream as one large file. If there are any errors or failures, the entire load fails.
    You can implement process() when the upstream source implements processWithMetadata(), but it might result in parsing errors.

  • processWithMetadata() is useful when the data source has metadata about record boundaries available in some structured format that's separate from the data payload. With this interface, the source emits a record length for each record in addition to the data.

    By implementing processWithMetadata() instead of process() in each phase, you can retain this record length metadata throughout the load stack, which enables a more efficient parse that can recover from errors on a per-message basis, rather than a per-file or per-source basis. KafkaSource and the Kafka parsers (KafkaAvroParser, KafkaJSONParser, and KafkaParser) use this mechanism to support per-Kafka-message rejections when individual Kafka messages are corrupted.

    Using processWithMetadata() with your UDFilter subclass enables you to write an internal filter that integrates the record length metadata from the source into the data stream, producing a single byte stream with boundary information to help parsers extract and process individual messages. KafkaInsertDelimeters and KafkaInsertLengths use this mechanism to insert message boundary information into Kafka data streams.

Optionally, you can override other UDFilter class methods.

Filter execution

The following sections detail the execution sequence each time a user-defined filter is called. The following example overrides the process() method.

Setting Up
COPY calls setup() before the first time it calls process(). Use setup() to perform any necessary setup steps that your filter needs to operate, such as initializing data structures to be used during filtering. Your object might be destroyed and re-created during use, so make sure that your object is restartable.

Filtering Data
COPY calls process() repeatedly during query execution to filter data. The method receives two instances of the DataBuffer class among its parameters, an input and an output buffer. Your implementation should read from the input buffer, manipulate it in some manner (such as decompressing it), and write the result to the output. A one-to-one correlation between the number of bytes your implementation reads and the number it writes might not exist. The process() method should process data until it either runs out of data to read or runs out of space in the output buffer. When one of these conditions occurs, your method should return one of the following values defined by StreamState:

  • OUTPUT_NEEDED if the filter needs more room in its output buffer.

  • INPUT_NEEDED if the filter has run out of input data (but the data source has not yet been fully processed).

  • DONE if the filter has processed all of the data in the data source.

  • KEEP_GOING if the filter cannot proceed for an extended period of time. The method will be called again, so do not block indefinitely. If you do, then you prevent your user from canceling the query.

Before returning, your process() method must set the offset property in each DataBuffer. In the input buffer, set it to the number of bytes that the method successfully read. In the output buffer, set it to the number of bytes the method wrote. Setting these properties allows the next call to process() to resume reading and writing data at the correct points in the buffers.

Your process() method also needs to check the InputState object passed to it to determine if there is more data in the data source. When this object is equal to END_OF_FILE, then the data remaining in the input data is the last data in the data source. Once it has processed all of the remaining data, process() must return DONE.

Tearing Down
COPY calls destroy() after the last time it calls process(). This method frees any resources reserved by the setup() or process() methods. Vertica calls this method after the process() method indicates it has finished filtering all of the data in the data stream.

If there are still data sources that have not yet been processed, Vertica may later call setup() on the object again. On subsequent calls Vertica directs the method to filter the data in a new data stream. Therefore, your destroy() method should leave an object of your UDFilter subclass in a state where the setup() method can prepare it to be reused.

API

The UDFilter API provides the following methods for extension by subclasses:

virtual void setup(ServerInterface &srvInterface);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input, InputState input_state, DataBuffer &output)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &input,
    LengthBuffer &input_lengths, InputState input_state, DataBuffer &output, LengthBuffer &output_lengths)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface);

The UDFilter API provides the following methods for extension by subclasses:

public void setup(ServerInterface srvInterface) throws UdfException;

public abstract StreamState process(ServerInterface srvInterface, DataBuffer input,
                InputState input_state, DataBuffer output)
    throws UdfException;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface) throws UdfException;

The UDFilter API provides the following methods for extension by subclasses:

class PyUDFilter(vertica_sdk.UDFilter):
    def __init__(self):
        pass

    def setup(self, srvInterface):
        pass

    def process(self, srvInterface, inputbuffer, outputbuffer, inputstate):
        # User process data here, and put into outputbuffer.
        return StreamState.DONE