This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

User-defined filter

User-defined filter functions allow you to manipulate data obtained from a source in various ways.

User-defined filter functions allow you to manipulate data obtained from a source in various ways. For example, a filter can:

  • Process a compressed file in a compression format not natively supported by Vertica.

  • Take UTF-16-encoded data and transcode it to UTF-8 encoding.

  • Perform search-and-replace operations on data before it is loaded into Vertica.

You can also process data through multiple filters before it is loaded into Vertica. For instance, you could unzip a file compressed with GZip, convert the content from UTF-16 to UTF-8, and finally search and replace certain text strings.

If you implement a UDFilter, you must also implement a corresponding FilterFactory.

See UDFilter class and FilterFactory class for API details.

1 - UDFilter class

The UDFilter class is responsible for reading raw input data from a source and preparing it to be loaded into Vertica or processed by a parser.

The UDFilter class is responsible for reading raw input data from a source and preparing it to be loaded into Vertica or processed by a parser. This preparation may involve decompression, re-encoding, or any other sort of binary manipulation.

A UDFilter is instantiated by a corresponding FilterFactory on each host in the Vertica cluster that is performing filtering for the data source.

UDFilter methods

Your UDFilter subclass must override process() or processWithMetadata():

  • process() reads the raw input stream as one large file. If there are any errors or failures, the entire load fails.
    You can implement process() when the upstream source implements processWithMetadata(), but it might result in parsing errors.

  • processWithMetadata() is useful when the data source has metadata about record boundaries available in some structured format that's separate from the data payload. With this interface, the source emits a record length for each record in addition to the data.

    By implementing processWithMetadata() instead of process() in each phase, you can retain this record length metadata throughout the load stack, which enables a more efficient parse that can recover from errors on a per-message basis, rather than a per-file or per-source basis. KafkaSource and the Kafka parsers (KafkaAvroParser, KafkaJSONParser, and KafkaParser) use this mechanism to support per-Kafka-message rejections when individual Kafka messages are corrupted.

    Using processWithMetadata() with your UDFilter subclass enables you to write an internal filter that integrates the record length metadata from the source into the data stream, producing a single byte stream with boundary information to help parsers extract and process individual messages. KafkaInsertDelimeters and KafkaInsertLengths use this mechanism to insert message boundary information into Kafka data streams.

Optionally, you can override other UDFilter class methods.

Filter execution

The following sections detail the execution sequence each time a user-defined filter is called. The following example overrides the process() method.

Setting Up
COPY calls setup() before the first time it calls process(). Use setup() to perform any necessary setup steps that your filter needs to operate, such as initializing data structures to be used during filtering. Your object might be destroyed and re-created during use, so make sure that your object is restartable.

Filtering Data
COPY calls process() repeatedly during query execution to filter data. The method receives two instances of the DataBuffer class among its parameters, an input and an output buffer. Your implementation should read from the input buffer, manipulate it in some manner (such as decompressing it), and write the result to the output. A one-to-one correlation between the number of bytes your implementation reads and the number it writes might not exist. The process() method should process data until it either runs out of data to read or runs out of space in the output buffer. When one of these conditions occurs, your method should return one of the following values defined by StreamState:

  • OUTPUT_NEEDED if the filter needs more room in its output buffer.

  • INPUT_NEEDED if the filter has run out of input data (but the data source has not yet been fully processed).

  • DONE if the filter has processed all of the data in the data source.

  • KEEP_GOING if the filter cannot proceed for an extended period of time. The method will be called again, so do not block indefinitely. If you do, then you prevent your user from canceling the query.

Before returning, your process() method must set the offset property in each DataBuffer. In the input buffer, set it to the number of bytes that the method successfully read. In the output buffer, set it to the number of bytes the method wrote. Setting these properties allows the next call to process() to resume reading and writing data at the correct points in the buffers.

Your process() method also needs to check the InputState object passed to it to determine if there is more data in the data source. When this object is equal to END_OF_FILE, then the data remaining in the input data is the last data in the data source. Once it has processed all of the remaining data, process() must return DONE.

Tearing Down
COPY calls destroy() after the last time it calls process(). This method frees any resources reserved by the setup() or process() methods. Vertica calls this method after the process() method indicates it has finished filtering all of the data in the data stream.

If there are still data sources that have not yet been processed, Vertica may later call setup() on the object again. On subsequent calls Vertica directs the method to filter the data in a new data stream. Therefore, your destroy() method should leave an object of your UDFilter subclass in a state where the setup() method can prepare it to be reused.

API

The UDFilter API provides the following methods for extension by subclasses:

virtual void setup(ServerInterface &srvInterface);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input, InputState input_state, DataBuffer &output)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &input,
    LengthBuffer &input_lengths, InputState input_state, DataBuffer &output, LengthBuffer &output_lengths)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface);

The UDFilter API provides the following methods for extension by subclasses:

public void setup(ServerInterface srvInterface) throws UdfException;

public abstract StreamState process(ServerInterface srvInterface, DataBuffer input,
                InputState input_state, DataBuffer output)
    throws UdfException;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface) throws UdfException;

The UDFilter API provides the following methods for extension by subclasses:

class PyUDFilter(vertica_sdk.UDFilter):
    def __init__(self):
        pass

    def setup(self, srvInterface):
        pass

    def process(self, srvInterface, inputbuffer, outputbuffer, inputstate):
        # User process data here, and put into outputbuffer.
        return StreamState.DONE

2 - FilterFactory class

If you write a filter, you must also write a filter factory to produce filter instances.

If you write a filter, you must also write a filter factory to produce filter instances. To do so, subclass the FilterFactory class.

Your subclass performs the initial validation and planning of the function execution and instantiates UDFilter objects on each host that will be filtering data.

Filter factories are singletons. Your subclass must be stateless, with no fields containing data. The subclass also must not modify any global variables.

FilterFactory methods

The FilterFactory class defines the following methods. Your subclass must override the prepare() method. It may override the other methods.

Setting up

Vertica calls plan() once on the initiator node, to perform the following tasks:

  • Check any parameters that have been passed from the function call in the COPY statement and error messages if there are any issues. You read the parameters by getting a ParamReader object from the instance of ServerInterface passed into your plan() method.

  • Store any information that the individual hosts need in order to filter data in the PlanContext instance passed as a parameter. For example, you could store details of the input format that the filter will read and output the format that the filter should produce. The plan() method runs only on the initiator node, and the prepare() method runs on each host reading from a data source. Therefore, this object is the only means of communication between them.

    You store data in the PlanContext by getting a ParamWriter object from the getWriter() method. You then write parameters by calling methods on the ParamWriter such as setString.

Creating filters

Vertica calls prepare() to create and initialize your filter. It calls this method once on each node that will perform filtering. Vertica automatically selects the best nodes to complete the work based on available resources. You cannot specify the nodes on which the work is done.

Defining parameters

Implement getParameterTypes() to define the names and types of parameters that your filter uses. Vertica uses this information to warn callers about unknown or missing parameters. Vertica ignores unknown parameters and uses default values for missing parameters. While you should define the types and parameters for your function, you are not required to override this method.

API

The FilterFactory API provides the following methods for extension by subclasses:

virtual void plan(ServerInterface &srvInterface, PlanContext &planCtxt);

virtual UDFilter * prepare(ServerInterface &srvInterface, PlanContext &planCtxt)=0;

virtual void getParameterType(ServerInterface &srvInterface, SizedColumnTypes &parameterTypes);

After creating your FilterFactory, you must register it with the RegisterFactory macro.

The FilterFactory API provides the following methods for extension by subclasses:

public void plan(ServerInterface srvInterface, PlanContext planCtxt)
    throws UdfException;

public abstract UDFilter prepare(ServerInterface srvInterface, PlanContext planCtxt)
    throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

The FilterFactory API provides the following methods for extension by subclasses:


class PyFilterFactory(vertica_sdk.SourceFactory):
    def __init__(self):
        pass
    def plan(self):
        pass
    def prepare(self, planContext):
        #User implement the function to create PyUDSources.
        pass

3 - Java example: ReplaceCharFilter

The example in this section demonstrates creating a UDFilter that replaces any occurrences of a character in the input stream with another character in the output stream.

The example in this section demonstrates creating a UDFilter that replaces any occurrences of a character in the input stream with another character in the output stream. This example is highly simplified and assumes the input stream is ASCII data.

Always remember that the input and output streams in a UDFilter are actually binary data. If you are performing character transformations using a UDFilter, convert the data stream from a string of bytes into a properly encoded string. For example, your input stream might consist of UTF-8 encoded text. If so, be sure to transform the raw binary being read from the buffer into a UTF string before manipulating it.

Loading and using the example

The example UDFilter has two required parameters. The from_char parameter specifies the character to be replaced, and the to_char parameter specifies the replacement character. Load and use the ReplaceCharFilter UDFilter as follows:

=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaUDlLib.jar'
->LANGUAGE 'JAVA';
CREATE LIBRARY
=> CREATE FILTER ReplaceCharFilter as LANGUAGE 'JAVA'
->name 'com.mycompany.UDL.ReplaceCharFilterFactory' library JavaLib;
CREATE FILTER FUNCTION
=> CREATE TABLE t (text VARCHAR);
CREATE TABLE
=> COPY t FROM STDIN WITH FILTER ReplaceCharFilter(from_char='a', to_char='z');
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> Mary had a little lamb
>> a man, a plan, a canal, Panama
>> \.

=> SELECT * FROM t;
              text
--------------------------------
 Mzry hzd z little lzmb
 z mzn, z plzn, z cznzl, Pznzmz
(2 rows)

=> --Calling the filter with incorrect parameters returns errors
=> COPY t from stdin with filter ReplaceCharFilter();
ERROR 3399:  Failure in UDx RPC call InvokePlanUDL(): Error in User Defined Object [
ReplaceCharFilter], error code: 0
com.vertica.sdk.UdfException: You must supply two parameters to ReplaceChar: 'from_char' and 'to_char'
        at com.vertica.JavaLibs.ReplaceCharFilterFactory.plan(ReplaceCharFilterFactory.java:22)
        at com.vertica.udxfence.UDxExecContext.planUDFilter(UDxExecContext.java:889)
        at com.vertica.udxfence.UDxExecContext.planCurrentUDLType(UDxExecContext.java:865)
        at com.vertica.udxfence.UDxExecContext.planUDL(UDxExecContext.java:821)
        at com.vertica.udxfence.UDxExecContext.run(UDxExecContext.java:242)
        at java.lang.Thread.run(Thread.java:662)

Parser implementation

The ReplaceCharFilter class reads the data stream, replacing each occurrence of a user-specified character with another character.

package com.vertica.JavaLibs;

import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.InputState;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.UDFilter;

public class ReplaceCharFilter extends UDFilter {

    private byte[] fromChar;
    private byte[] toChar;

    public ReplaceCharFilter(String fromChar, String toChar){
        // Stores the from char and to char as byte arrays. This is
        // not a robust method of doing this, but works for this simple
        // example.
        this.fromChar= fromChar.getBytes();
        this.toChar=toChar.getBytes();
    }
    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer input,
            InputState input_state, DataBuffer output) {

        // Check if there is no more input and the input buffer has been completely
        // processed. If so, filtering is done.
        if (input_state == InputState.END_OF_FILE && input.buf.length == 0) {
            return StreamState.DONE;
        }

        // Get current position in the input buffer
        int offset = output.offset;

        // Determine how many bytes to process. This is either until input
        // buffer is exhausted or output buffer is filled
        int limit = Math.min((input.buf.length - input.offset),
                (output.buf.length - output.offset));

        for (int i = input.offset; i < limit; i++) {
            // This example just replaces each instance of from_char
            // with to_char. It does not consider things such as multi-byte
            // UTF-8 characters.
            if (input.buf[i] == fromChar[0]) {
                output.buf[i+offset] = toChar[0];
            } else {
                // Did not find from_char, so copy input to the output
                output.buf[i+offset]=input.buf[i];
            }
        }

        input.offset += limit;
        output.offset += input.offset;

        if (input.buf.length - input.offset < output.buf.length - output.offset) {
            return StreamState.INPUT_NEEDED;
        } else {
            return StreamState.OUTPUT_NEEDED;
        }
    }
}

Factory implementation

ReplaceCharFilterFactory requires two parameters (from_char and to_char). The plan() method verifies that these parameters exist and are single-character strings. The method then stores them in the plan context. The prepare() method gets the parameter values and passes them to the ReplaceCharFilter objects, which it instantiates, to perform the filtering.

package com.vertica.JavaLibs;

import java.util.ArrayList;
import java.util.Vector;

import com.vertica.sdk.FilterFactory;
import com.vertica.sdk.PlanContext;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.UDFilter;
import com.vertica.sdk.UdfException;

public class ReplaceCharFilterFactory extends FilterFactory {

    // Run on the initiator node to perform varification and basic setup.
    @Override
    public void plan(ServerInterface srvInterface,PlanContext planCtxt)
                     throws UdfException {
        ArrayList<String> args =
                          srvInterface.getParamReader().getParamNames();

        // Ensure user supplied two arguments
        if (!(args.contains("from_char") && args.contains("to_char"))) {
            throw new UdfException(0, "You must supply two parameters" +
                        " to ReplaceChar: 'from_char' and 'to_char'");
        }

        // Verify that the from_char is a single character.
        String fromChar = srvInterface.getParamReader().getString("from_char");
        if (fromChar.length() != 1) {
            String message =  String.format("Replacechar expects a single " +
                "character in the 'from_char' parameter. Got length %d",
                fromChar.length());
            throw new UdfException(0, message);
        }

        // Save the from character in the plan context, to be read by
        // prepare() method.
        planCtxt.getWriter().setString("fromChar",fromChar);

        // Ensure to character parameter is a single characater
        String toChar = srvInterface.getParamReader().getString("to_char");
        if (toChar.length() != 1) {
            String message =  String.format("Replacechar expects a single "
                 + "character in the 'to_char' parameter. Got length %d",
                toChar.length());
            throw new UdfException(0, message);
        }
        // Save the to character in the plan data
        planCtxt.getWriter().setString("toChar",toChar);
    }

    // Called on every host that will filter data. Must instantiate the
    // UDFilter subclass.
    @Override
    public UDFilter prepare(ServerInterface srvInterface, PlanContext planCtxt)
                            throws UdfException {
        // Get data stored in the context by the plan() method.
        String fromChar = planCtxt.getWriter().getString("fromChar");
        String toChar = planCtxt.getWriter().getString("toChar");

        // Instantiate a filter object to perform filtering.
        return new ReplaceCharFilter(fromChar, toChar);
    }

    // Describe the parameters accepted by this filter.
    @Override
    public void getParameterType(ServerInterface srvInterface,
             SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(1, "from_char");
        parameterTypes.addVarchar(1, "to_char");
    }
}

4 - C++ example: converting encoding

The following example shows how you can convert encoding for a file from one type to another by converting UTF-16 encoded data to UTF-8.

The following example shows how you can convert encoding for a file from one type to another by converting UTF-16 encoded data to UTF-8. You can find this example in the SDK at /opt/vertica/sdk/examples/FilterFunctions/IConverter.cpp.

Filter implementation

class Iconverter : public UDFilter{
private:
    std::string fromEncoding, toEncoding;
    iconv_t cd; // the conversion descriptor opened
    uint converted; // how many characters have been converted
protected:
    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input,
                                InputState input_state, DataBuffer &output)
    {
        char *input_buf = (char *)input.buf + input.offset;
        char *output_buf = (char *)output.buf + output.offset;
        size_t inBytesLeft = input.size - input.offset, outBytesLeft = output.size - output.offset;
        // end of input
        if (input_state == END_OF_FILE && inBytesLeft == 0)
        {
            // Gnu libc iconv doc says, it is good practice to finalize the
            // outbuffer for stateful encodings (by calling with null inbuffer).
            //
            // http://www.gnu.org/software/libc/manual/html_node/Generic-Conversion-Interface.html
            iconv(cd, NULL, NULL, &output_buf, &outBytesLeft);
            // output buffer can be updated by this operation
            output.offset = output.size - outBytesLeft;
            return DONE;
        }
        size_t ret = iconv(cd, &input_buf, &inBytesLeft, &output_buf, &outBytesLeft);
        // if conversion is successful, we ask for more input, as input has not reached EOF.
        StreamState retStatus = INPUT_NEEDED;
        if (ret == (size_t)(-1))
        {
            // seen an error
            switch (errno)
            {
            case E2BIG:
                // input size too big, not a problem, ask for more output.
                retStatus = OUTPUT_NEEDED;
                break;
            case EINVAL:
                // input stops in the middle of a byte sequence, not a problem, ask for more input
                retStatus = input_state == END_OF_FILE ? DONE : INPUT_NEEDED;
                break;
            case EILSEQ:
                // invalid sequence seen, throw
                // TODO: reporting the wrong byte position
                vt_report_error(1, "Invalid byte sequence when doing %u-th conversion", converted);
            case EBADF:
                // something wrong with descriptor, throw
                vt_report_error(0, "Invalid descriptor");
            default:
                vt_report_error(0, "Uncommon Error");
                break;
            }
        }
        else converted += ret;
        // move position pointer
        input.offset = input.size - inBytesLeft;
        output.offset = output.size - outBytesLeft;
        return retStatus;
    }
public:
    Iconverter(const std::string &from, const std::string &to)
    : fromEncoding(from), toEncoding(to), converted(0)
    {
        // note "to encoding" is first argument to iconv...
        cd = iconv_open(to.c_str(), from.c_str());
        if (cd == (iconv_t)(-1))
        {
            // error when creating converters.
            vt_report_error(0, "Error initializing iconv: %m");
        }
    }
    ~Iconverter()
    {
        // free iconv resources;
        iconv_close(cd);
    }
};

Factory implementation

class IconverterFactory : public FilterFactory{
public:
    virtual void plan(ServerInterface &srvInterface,
            PlanContext &planCtxt) {
        std::vector<std::string> args = srvInterface.getParamReader().getParamNames();
        /* Check parameters */
        if (!(args.size() == 0 ||
                (args.size() == 1 && find(args.begin(), args.end(), "from_encoding")
                        != args.end()) || (args.size() == 2
                        && find(args.begin(), args.end(), "from_encoding") != args.end()
                        && find(args.begin(), args.end(), "to_encoding") != args.end()))) {
            vt_report_error(0, "Invalid arguments.  Must specify either no arguments,  or "
                               "'from_encoding' alone, or 'from_encoding' and 'to_encoding'.");
        }
        /* Populate planData */
        // By default, we do UTF16->UTF8, and x->UTF8
        VString from_encoding = planCtxt.getWriter().getStringRef("from_encoding");
        VString to_encoding = planCtxt.getWriter().getStringRef("to_encoding");
        from_encoding.copy("UTF-16");
        to_encoding.copy("UTF-8");
        if (args.size() == 2)
        {
            from_encoding.copy(srvInterface.getParamReader().getStringRef("from_encoding"));
            to_encoding.copy(srvInterface.getParamReader().getStringRef("to_encoding"));
        }
        else if (args.size() == 1)
        {
            from_encoding.copy(srvInterface.getParamReader().getStringRef("from_encoding"));
        }
        if (!from_encoding.length()) {
            vt_report_error(0, "The empty string is not a valid from_encoding value");
        }
        if (!to_encoding.length()) {
            vt_report_error(0, "The empty string is not a valid to_encoding value");
        }
    }
    virtual UDFilter* prepare(ServerInterface &srvInterface,
            PlanContext &planCtxt) {
        return vt_createFuncObj(srvInterface.allocator, Iconverter,
                planCtxt.getReader().getStringRef("from_encoding").str(),
                planCtxt.getReader().getStringRef("to_encoding").str());
    }
    virtual void getParameterType(ServerInterface &srvInterface,
                                  SizedColumnTypes &parameterTypes) {
        parameterTypes.addVarchar(32, "from_encoding");
        parameterTypes.addVarchar(32, "to_encoding");
    }
};
RegisterFactory(IconverterFactory);