This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

User-defined load (UDL)

COPY offers extensive options and settings to control how to load data.

COPY offers extensive options and settings to control how to load data. However, you may find that these options do not suit the type of data load that you want to perform. The user-defined load (UDL) feature lets you develop one or more functions that change how the COPY statement operates. You can create custom libraries using the Vertica SDK to handle various steps in the loading process. .

You use three types of UDL functions during development, one for each stage of the data-load process:

  • User-defined source (UDSource): Controls how COPY obtains the data it loads into the database. For example, COPY might obtain data by fetching it through HTTP or cURL. Up to one UDSource reads data from a file or input stream. Your UDSource can read from more than one source, but COPY invokes only one UDSource.

    API support: C++, Java.

  • User-defined filter (UDFilter): Preprocesses the data. For example, a filter might unzip a file or convert UTF-16 to UTF-8. You can chain multiple user-defined filters together, for example unzipping and then converting.

    API support: C++, Java, Python.

  • User-defined parser (UDParser): Up to one parser parses the data into tuples that are ready to be inserted into a table. For example, a parser could extract data from an XML-like format. You can optionally define a user-defined chunker (UDChunker, C++ only), to have the parser perform parallel parsing.

    API support: C++, Java, Python.

After the final step, COPY inserts the data into a table, or rejects it if the format is incorrect.

1 - User-defined source

A user-defined source allows you to process a source of data using a method that is not built into Vertica.

A user-defined source allows you to process a source of data using a method that is not built into Vertica. For example, you can write a user-defined source to access the data from an HTTP source using cURL. While a given COPY statement can use specify only one user-defined source statement, the source function itself can pull data from multiple sources.

The UDSource class acquires data from an external source. It reads data from an input stream and produces an output stream to be filtered and parsed. If you implement a UDSource, you must also implement a corresponding SourceFactory.

1.1 - UDSource class

You can subclass the UDSource class when you need to load data from a source type that COPY does not already support.

You can subclass the UDSource class when you need to load data from a source type that COPY does not already support.

Each instance of your UDSource subclass reads from a single data source. Examples of a single data source are a single file or the results of a single function call to a RESTful web application.

UDSource methods

Your UDSource subclass must override process() or processWithMetadata():

  • process() reads the raw input stream as one large file. If there are any errors or failures, the entire load fails.

  • processWithMetadata() is useful when the data source has metadata about record boundaries available in some structured format that's separate from the data payload. With this interface, the source emits a record length for each record in addition to the data.

    By implementing processWithMetadata() instead of process() in each phase, you can retain this record length metadata throughout the load stack, which enables a more efficient parse that can recover from errors on a per-message basis, rather than a per-file or per-source basis. KafkaSource and the Kafka parsers (KafkaAvroParser, KafkaJSONParser, and KafkaParser) use this mechanism to support per-Kafka-message rejections when individual Kafka messages are cannot be parsed.

Additionally, you can override the other UDSource class methods.

Source execution

The following sections detail the execution sequence each time a user-defined source is called. The following example overrides the process() method.

Setting Up
COPY calls setup() before the first time it calls process(). Use setup() to perform any necessary setup steps to access the data source. This method establishes network connections, opens files, and similar tasks that need to be performed before the UDSource can read data from the data source. Your object might be destroyed and re-created during use, so make sure that your object is restartable.

Processing a Source
COPY calls process() repeatedly during query execution to read data and write it to the DataBuffer passed as a parameter. This buffer is then passed to the first filter.

If the source runs out of input, or fills the output buffer, it must return the value StreamState.OUTPUT_NEEDED. When Vertica gets this return value, it will call the method again. This second call occurs after the output buffer has been processed by the next stage in the data-load process. Returning StreamState.DONE indicates that all of the data from the source has been read.

The user can cancel the load operation, which aborts reading.

Tearing Down
COPY calls destroy() after the last time that process() is called. This method frees any resources reserved by the setup() or process() methods, such as file handles or network connections that the setup() method allocated.

Accessors

A source can define two accessors, getSize() and getUri().

COPY might call getSize() to estimate the number of bytes of data to be read before calling process(). This value is an estimate only and is used to indicate the file size in the LOAD_STREAMS table. Because Vertica can call this method before calling setup(), getSize() must not rely on any resources allocated by setup().

This method should not leave any resources open. For example, do not save any file handles opened by getSize() for use by the process() method. Doing so can exhaust the available resources, because Vertica calls getSize() on all instances of your UDSource subclass before any data is loaded. If many data sources are being opened, these open file handles could use up the system's supply of file handles. Thus, none would remain available to perform the actual data load.

Vertica calls getUri() during execution to update status information about which resources are currently being loaded. It returns the URI of the data source being read by this UDSource.

API

The UDSource API provides the following methods for extension by subclasses:

virtual void setup(ServerInterface &srvInterface);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &output, LengthBuffer &output_lengths)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface);

virtual vint getSize();

virtual std::string getUri();

The UDSource API provides the following methods for extension by subclasses:

public void setup(ServerInterface srvInterface) throws UdfException;

public abstract StreamState process(ServerInterface srvInterface, DataBuffer output) throws UdfException;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface) throws UdfException;

public Integer getSize();

public String getUri();

1.2 - SourceFactory class

If you write a source, you must also write a source factory.

If you write a source, you must also write a source factory. Your subclass of the SourceFactory class is responsible for:

  • Performing the initial validation of the parameters passed to your UDSource.

  • Setting up any data structures your UDSource instances need to perform their work. This information can include recording which nodes will read which data source.

  • Creating one instance of your UDSource subclass for each data source (or portion thereof) that your function reads on each host.

The simplest source factory creates one UDSource instance per data source per executor node. You can also use multiple concurrent UDSource instances on each node. This behavior is called concurrent load. To support both options, SourceFactory has two versions of the method that creates the sources. You must implement exactly one of them.

Source factories are singletons. Your subclass must be stateless, with no fields containing data. The subclass also must not modify any global variables.

SourceFactory methods

The SourceFactory class defines several methods. Your class must override prepareUDSources(); it may override the other methods.

Setting up

Vertica calls plan() once on the initiator node to perform the following tasks:

  • Check the parameters the user supplied to the function call in the COPY statement and provide error messages if there are any issues. You can read the parameters by getting a ParamReader object from the instance of ServerInterface passed into the plan() method.

  • Decide which hosts in the cluster will read the data source. How you divide up the work depends on the source your function is reading. Some sources can be split across many hosts, such as a source that reads data from many URLs. Others, such as an individual local file on a host's file system, can be read only by a single specified host.

    You store the list of hosts to read the data source by calling the setTargetNodes() method on the NodeSpecifyingPlanContext object. This object is passed into your plan() method.

  • Store any information that the individual hosts need to process the data sources in the NodeSpecifyingPlanContext instance passed to the plan() method. For example, you could store assignments that tell each host which data sources to process. The plan() method runs only on the initiator node, and the prepareUDSources() method runs on each host reading from a data source. Therefore, this object is the only means of communication between them.

    You store data in the NodeSpecifyingPlanContext by getting a ParamWriter object from the getWriter() method. You then write parameters by calling methods on the ParamWriter such as setString().

Creating sources

Vertica calls prepareUDSources() on all hosts that the plan() method selected to load data. This call instantiates and returns a list of UDSource subclass instances. If you are not using concurrent load, return one UDSource for each of the sources that the host is assigned to process. If you are using concurrent load, use the version of the method that takes an ExecutorPlanContext as a parameter, and return as many sources as you can use. Your factory must implement exactly one of these methods.

For concurrent load, you can find out how many threads are available on the node to run UDSource instances by calling getLoadConcurrency() on the ExecutorPlanContext that is passed in.

Defining parameters

Implement getParameterTypes() to define the names and types of parameters that your source uses. Vertica uses this information to warn callers about unknown or missing parameters. Vertica ignores unknown parameters and uses default values for missing parameters. While you should define the types and parameters for your function, you are not required to override this method.

Requesting threads for concurrent load

When a source factory creates sources on an executor node, by default, it creates one thread per source. If your sources can use multiple threads, implement getDesiredThreads(). Vertica calls this method before it calls prepareUDSources(), so you can also use it to decide how many sources to create. Return the number of threads your factory can use for sources. The maximum number of available threads is passed in, so you can take that into account. The value your method returns is a hint, not a guarantee; each executor node determines the number of threads to allocate. The FilePortionSourceFactory example implements this method; see C++ example: concurrent load.

You can allow your source to have control over parallelism, meaning that it can divide a single input into multiple load streams, by implementing isSourceApportionable(). Returning true from this method does not guarantee that the source will apportion the load. However, returning false indicates that it will not try to do so. See Apportioned load for more information.

Often, a SourceFactory that implements getDesiredThreads() also uses apportioned load. However, using apportioned load is not a requirement. A source reading from Kafka streams, for example, could use multiple threads without ssapportioning.

API

The SourceFactory API provides the following methods for extension by subclasses:

virtual void plan(ServerInterface &srvInterface, NodeSpecifyingPlanContext &planCtxt);

// must implement exactly one of prepareUDSources() or prepareUDSourcesExecutor()
virtual std::vector< UDSource * > prepareUDSources(ServerInterface &srvInterface,
            NodeSpecifyingPlanContext &planCtxt);

virtual std::vector< UDSource * > prepareUDSourcesExecutor(ServerInterface &srvInterface,
            ExecutorPlanContext &planCtxt);

virtual void getParameterType(ServerInterface &srvInterface,
            SizedColumnTypes &parameterTypes);

virtual bool isSourceApportionable();

ssize_t getDesiredThreads(ServerInterface &srvInterface,
            ExecutorPlanContext &planContext);

After creating your SourceFactory, you must register it with the RegisterFactory macro.

The SourceFactory API provides the following methods for extension by subclasses:

public void plan(ServerInterface srvInterface, NodeSpecifyingPlanContext planCtxt)
    throws UdfException;

// must implement one overload of prepareUDSources()
public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface,
                NodeSpecifyingPlanContext planCtxt)
    throws UdfException;

public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface,
                ExecutorPlanContext planCtxt)
    throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

public boolean isSourceApportionable();

public int getDesiredThreads(ServerInterface srvInterface,
                ExecutorPlanContext planCtxt)
    throws UdfException;

1.3 - C++ example: CurlSource

The CurlSource example allows you to use cURL to open and read in a file over HTTP.

The CurlSource example allows you to use cURL to open and read in a file over HTTP. The example provided is part of: /opt/vertica/sdk/examples/SourceFunctions/cURL.cpp.

Source implementation

This example uses the helper library available in /opt/vertica/sdk/examples/HelperLibraries/.

CurlSource loads the data in chunks. If the parser encounters an EndOfFile marker, then the process() method returns DONE. Otherwise, the method returns OUTPUT_NEEDED and processes another chunk of data. The functions included in the helper library (such as url_fread() and url_fopen()) are based on examples that come with the libcurl library. For an example, see http://curl.haxx.se/libcurl/c/fopen.html.

The setup() function opens a file handle and the destroy() function closes it. Both use functions from the helper library.

class CurlSource : public UDSource {private:
    URL_FILE *handle;
    std::string url;
    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output) {
        output.offset = url_fread(output.buf, 1, output.size, handle);
        return url_feof(handle) ? DONE : OUTPUT_NEEDED;
    }
public:
    CurlSource(std::string url) : url(url) {}
    void setup(ServerInterface &srvInterface) {
        handle = url_fopen(url.c_str(),"r");
    }
    void destroy(ServerInterface &srvInterface) {
        url_fclose(handle);
    }
};

Factory implementation

CurlSourceFactory produces CurlSource instances.

class CurlSourceFactory : public SourceFactory {public:
    virtual void plan(ServerInterface &srvInterface,
            NodeSpecifyingPlanContext &planCtxt) {
        std::vector<std::string> args = srvInterface.getParamReader().getParamNames();
       /* Check parameters */
        if (args.size() != 1 || find(args.begin(), args.end(), "url") == args.end()) {
            vt_report_error(0, "You must provide a single URL.");
        }
        /* Populate planData */
        planCtxt.getWriter().getStringRef("url").copy(
                                    srvInterface.getParamReader().getStringRef("url"));

        /* Assign Nodes */
        std::vector<std::string> executionNodes = planCtxt.getClusterNodes();
        while (executionNodes.size() > 1) executionNodes.pop_back();
        // Only run on the first node in the list.
        planCtxt.setTargetNodes(executionNodes);
    }
    virtual std::vector<UDSource*> prepareUDSources(ServerInterface &srvInterface,
            NodeSpecifyingPlanContext &planCtxt) {
        std::vector<UDSource*> retVal;
        retVal.push_back(vt_createFuncObj(srvInterface.allocator, CurlSource,
                planCtxt.getReader().getStringRef("url").str()));
        return retVal;
    }
    virtual void getParameterType(ServerInterface &srvInterface,
                                  SizedColumnTypes &parameterTypes) {
        parameterTypes.addVarchar(65000, "url");
    }
};
RegisterFactory(CurlSourceFactory);

1.4 - C++ example: concurrent load

The FilePortionSource example demonstrates the use of concurrent load.

The FilePortionSource example demonstrates the use of concurrent load. This example is a refinement of the FileSource example. Each input file is divided into portions and distributed to FilePortionSource instances. The source accepts a list of offsets at which to break the input into portions; if offsets are not provided, the source divides the input dynamically.

Concurrent load is handled in the factory, so this discussion focuses on FilePortionSourceFactory. The full code for the example is located in /opt/vertica/sdk/examples/ApportionLoadFunctions. The distribution also includes a Java version of this example.

Loading and using the example

Load and use the FilePortionSource example as follows.

=> CREATE LIBRARY FilePortionLib AS '/home/dbadmin/FP.so';

=> CREATE SOURCE FilePortionSource AS LANGUAGE 'C++'
-> NAME 'FilePortionSourceFactory' LIBRARY FilePortionLib;

=> COPY t WITH SOURCE FilePortionSource(file='g1/*.dat', nodes='initiator,e0,e1', offsets = '0,380000,820000');

=> COPY t WITH SOURCE FilePortionSource(file='g2/*.dat', nodes='e0,e1,e2', local_min_portion_size = 2097152);

Implementation

Concurrent load affects the source factory in two places, getDesiredThreads() and prepareUDSourcesExecutor().

getDesiredThreads()

The getDesiredThreads() member function determines the number of threads to request. Vertica calls this member function on each executor node before calling prepareUDSourcesExecutor().

The function begins by breaking an input file path, which might be a glob, into individual paths. This discussion omits those details. If apportioned load is not being used, then the function allocates one source per file.

virtual ssize_t getDesiredThreads(ServerInterface &srvInterface,
    ExecutorPlanContext &planCtxt) {
  const std::string filename = srvInterface.getParamReader().getStringRef("file").str();

  std::vector<std::string> paths;
  // expand the glob - at least one thread per source.
  ...

  // figure out how to assign files to sources
  const std::string nodeName = srvInterface.getCurrentNodeName();
  const size_t nodeId = planCtxt.getWriter().getIntRef(nodeName);
  const size_t numNodes = planCtxt.getTargetNodes().size();

  if (!planCtxt.canApportionSource()) {
    /* no apportioning, so the number of files is the final number of sources */
    std::vector<std::string> *expanded =
        vt_createFuncObject<std::vector<std::string> >(srvInterface.allocator, paths);
    /* save expanded paths so we don't have to compute expansion again */
    planCtxt.getWriter().setPointer("expanded", expanded);
    return expanded->size();
  }

  // ...

If the source can be apportioned, then getDesiredThreads() uses the offsets that were passed as arguments to the factory to divide the file into portions. It then allocates portions to available nodes. This function does not actually assign sources directly; this work is done to determine how many threads to request.

  else if (srvInterface.getParamReader().containsParameter("offsets")) {

    // if the offsets are specified, then we will have a fixed number of portions per file.
    // Round-robin assign offsets to nodes.
    // ...

    /* Construct the portions that this node will actually handle.
     * This isn't changing (since the offset assignments are fixed),
     * so we'll build the Portion objects now and make them available
     * to prepareUDSourcesExecutor() by putting them in the ExecutorContext.
     *
     * We don't know the size of the last portion, since it depends on the file
     * size.  Rather than figure it out here we will indicate it with -1 and
     * defer that to prepareUDSourcesExecutor().
     */
    std::vector<Portion> *portions =
        vt_createFuncObject<std::vector<Portion>>(srvInterface.allocator);

    for (std::vector<size_t>::const_iterator offset = offsets.begin();
            offset != offsets.end(); ++offset) {
        Portion p(*offset);
        p.is_first_portion = (offset == offsets.begin());
        p.size = (offset + 1 == offsets.end() ? -1 : (*(offset + 1) - *offset));

        if ((offset - offsets.begin()) % numNodes == nodeId) {
            portions->push_back(p);
            srvInterface.log("FilePortionSource: assigning portion %ld: [offset = %lld, size = %lld]",
                    offset - offsets.begin(), p.offset, p.size);
        }
      }

The function now has all the portions and thus the number of portions:

      planCtxt.getWriter().setPointer("portions", portions);

      /* total number of threads we want is the number of portions per file, which is fixed */
      return portions->size() * expanded->size();
    } // end of "offsets" parameter

If offsets were not provided, the function divides the file into portions dynamically, one portion per thread. This discussion omits the details of this computation. There is no point in requesting more threads than are available, so the function calls getMaxAllowedThreads() on the PlanContext (an argument to the function) to set an upper bound:

  if (portions->size() >= planCtxt.getMaxAllowedThreads()) {
    return paths.size();
  }

See the full example for the details of how this function divides the file into portions.

This function uses the vt_createFuncObject template to create objects. Vertica calls the destructors of returned objects created using this macro, but it does not call destructors for other objects like vectors. You must call these destructors yourself to avoid memory leaks. In this example, these calls are made in prepareUDSourcesExecutor().

prepareUDSourcesExecutor()

The prepareUDSourcesExecutor() member function, like getDesiredThreads(), has separate blocks of code depending on whether offsets are provided. In both cases, the function breaks input into portions and creates UDSource instances for them.

If the function is called with offsets, prepareUDSourcesExecutor() calls prepareCustomizedPortions(). This function follows.

/* prepare portions as determined via the "offsets" parameter */
void prepareCustomizedPortions(ServerInterface &srvInterface,
                               ExecutorPlanContext &planCtxt,
                               std::vector<UDSource *> &sources,
                               const std::vector<std::string> &expandedPaths,
                               std::vector<Portion> &portions) {
    for (std::vector<std::string>::const_iterator filename = expandedPaths.begin();
            filename != expandedPaths.end(); ++filename) {
        /*
         * the "portions" vector contains the portions which were generated in
         * "getDesiredThreads"
         */
        const size_t fileSize = getFileSize(*filename);
        for (std::vector<Portion>::const_iterator portion = portions.begin();
                portion != portions.end(); ++portion) {
            Portion fportion(*portion);
            if (fportion.size == -1) {
                /* as described above, this means from the offset to the end */
                fportion.size = fileSize - portion->offset;
                sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
                            *filename, fportion));
            } else if (fportion.size > 0) {
                sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
                            *filename, fportion));
            }
        }
    }
}

If prepareUDSourcesExecutor() is called without offsets, then it must decide how many portions to create.

The base case is to use one portion per source. However, if extra threads are available, the function divides the input into more portions so that a source can process them concurrently. Then prepareUDSourcesExecutor() calls prepareGeneratedPortions() to create the portions. This function begins by calling getLoadConcurrency() on the plan context to find out how many threads are available.

void prepareGeneratedPortions(ServerInterface &srvInterface,
                              ExecutorPlanContext &planCtxt,
                              std::vector<UDSource *> &sources,
                              std::map<std::string, Portion> initialPortions) {

  if ((ssize_t) initialPortions.size() >= planCtxt.getLoadConcurrency()) {
  /* all threads will be used, don't bother splitting into portions */

  for (std::map<std::string, Portion>::const_iterator file = initialPortions.begin();
       file != initialPortions.end(); ++file) {
    sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
            file->first, file->second));
       } // for
    return;
  } // if

  // Now we can split files to take advantage of potentially-unused threads.
  // First sort by size (descending), then we will split the largest first.

  // details elided...

}

For more information

See the source code for the full implementation of this example.

1.5 - Java example: FileSource

The example shown in this section is a simple UDL Source function named FileSource, This function loads data from files stored on the host's file system (similar to the standard COPY statement).

The example shown in this section is a simple UDL Source function named FileSource, This function loads data from files stored on the host's file system (similar to the standard COPY statement). To call FileSource, you must supply a parameter named file that contains the absolute path to one or more files on the host file system. You can specify multiple files as a comma-separated list.

The FileSource function also accepts an optional parameter, named nodes, that indicates which nodes should load the files. If you do not supply this parameter, the function defaults to loading data on the initiator node only. Because this example is simple, the nodes load only the files from their own file systems. Any files in the file parameter must exist on all of the hosts in the nodes parameter. The FileSource UDSource attempts to load all of the files in the file parameter on all of the hosts in the nodes parameter.

Generating files

You can use the following Python script to generate files and distribute them to hosts in your Vertica cluster. With these files, you can experiment with the example UDSource function. Running the function requires passwordless-SSH logins to copy the files to the other hosts. Therefore, you must run the script using the database administrator account on one of your database hosts.

#!/usr/bin/python
# Save this file as UDLDataGen.py
import string
import random
import sys
import os

# Read in the dictionary file to provide random words. Assumes the words
# file is located in /usr/share/dict/words
wordFile = open("/usr/share/dict/words")
wordDict = []
for line in wordFile:
    if len(line) > 6:
        wordDict.append(line.strip())

MAXSTR = 4 # Maximum number of words to concatentate
NUMROWS = 1000 # Number of rows of data to generate
#FILEPATH = '/tmp/UDLdata.txt' # Final filename to use for UDL source
TMPFILE = '/tmp/UDLtemp.txt'  # Temporary filename.

# Generate a random string by concatenating several words together. Max
# number of words set by MAXSTR
def randomWords():
    words = [random.choice(wordDict) for n in xrange(random.randint(1, MAXSTR))]
    sentence = " ".join(words)
    return sentence

# Create a temporary data file that will be moved to a node. Number of
# rows for the file is set by NUMROWS. Adds the name of the node which will
# get the file, to show which node loaded the data.
def generateFile(node):
    outFile = open(TMPFILE, 'w')
    for line in xrange(NUMROWS):
        outFile.write('{0}|{1}|{2}\n'.format(line,randomWords(),node))
    outFile.close()

# Copy the temporary file to a node. Only works if passwordless SSH login
# is enabled, which it is for the database administrator account on
# Vertica hosts.
def copyFile(fileName,node):
    os.system('scp "%s" "%s:%s"' % (TMPFILE, node, fileName) )

# Loop through the comma-separated list of nodes given in the first
# parameter, creating and copying data files whose full comma-separated
# paths are passed in the second parameter
for node in [x.strip() for x in sys.argv[1].split(',')]:
    for fileName in [y.strip() for y in sys.argv[2].split(',')]:
        print "generating file", fileName, "for", node
        generateFile(node)
        print "Copying file to",node
        copyFile(fileName,node)

You call this script by giving it a comma-separated list of hosts to receive the files and a comma-separated list of absolute paths of files to generate. For example:

$ python UDLDataGen.py v_vmart_node0001,v_vmart_node0002,v_vmart_node0003 /tmp/UDLdata01.txt,/tmp/UDLdata02.txt,UDLdata03.txt

This script generates files that contain a thousand rows of columns delimited with the pipe character (|). These columns contain an index value, a set of random words, and the node for which the file was generated, as shown in the following output sample:

0|megabits embanks|v_vmart_node0001
1|unneatly|v_vmart_node0001
2|self-precipitation|v_vmart_node0001
3|antihistamine scalados Vatter|v_vmart_node0001

Loading and using the example

Load and use the FileSource UDSource as follows:

=> --Load library and create the source function
=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaUDlLib.jar'
-> LANGUAGE 'JAVA';
CREATE LIBRARY
=> CREATE SOURCE File as LANGUAGE 'JAVA' NAME
-> 'com.mycompany.UDL.FileSourceFactory' LIBRARY JavaLib;
CREATE SOURCE FUNCTION
=> --Create a table to hold the data loaded from files
=> CREATE TABLE t (i integer, text VARCHAR, node VARCHAR);
CREATE TABLE
=> -- Copy a single file from the currently host using the FileSource
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt');
 Rows Loaded
-------------
        1000
(1 row)

=> --See some of what got loaded.
=> SELECT * FROM t WHERE i < 5 ORDER BY i;
 i |             text              |  node
---+-------------------------------+-----------------
 0 | megabits embanks              | v_vmart_node0001
 1 | unneatly                      | v_vmart_node0001
 2 | self-precipitation            | v_vmart_node0001
 3 | antihistamine scalados Vatter | v_vmart_node0001
 4 | fate-menaced toilworn         | v_vmart_node0001
(5 rows)



=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> -- Now load a file from three hosts. All of these hosts must have a file
=> -- named /tmp/UDLdata01.txt, each with different data
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt',
-> nodes='v_vmart_node0001,v_vmart_node0002,v_vmart_node0003');
 Rows Loaded
-------------
        3000
(1 row)

=> --Now see what has been loaded
=> SELECT * FROM t WHERE i < 5 ORDER BY i,node ;
 i |                      text                       |  node
---+-------------------------------------------------+--------
 0 | megabits embanks                                | v_vmart_node0001
 0 | nimble-eyed undupability frowsier               | v_vmart_node0002
 0 | Circean nonrepellence nonnasality               | v_vmart_node0003
 1 | unneatly                                        | v_vmart_node0001
 1 | floatmaker trabacolos hit-in                    | v_vmart_node0002
 1 | revelrous treatableness Halleck                 | v_vmart_node0003
 2 | self-precipitation                              | v_vmart_node0001
 2 | whipcords archipelagic protodonatan copycutter  | v_vmart_node0002
 2 | Paganalian geochemistry short-shucks            | v_vmart_node0003
 3 | antihistamine scalados Vatter                   | v_vmart_node0001
 3 | swordweed touristical subcommanders desalinized | v_vmart_node0002
 3 | batboys                                         | v_vmart_node0003
 4 | fate-menaced toilworn                           | v_vmart_node0001
 4 | twice-wanted cirrocumulous                      | v_vmart_node0002
 4 | doon-head-clock                                 | v_vmart_node0003
(15 rows)

=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> --Now copy from several files on several hosts
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt,/tmp/UDLdata02.txt,/tmp/UDLdata03.txt'
-> ,nodes='v_vmart_node0001,v_vmart_node0002,v_vmart_node0003');
 Rows Loaded
-------------
        9000
(1 row)

=> SELECT * FROM t WHERE i = 0 ORDER BY node ;
 i |                    text                     |  node
---+---------------------------------------------+--------
 0 | Awolowo Mirabilis D'Amboise                 | v_vmart_node0001
 0 | sortieing Divisionism selfhypnotization     | v_vmart_node0001
 0 | megabits embanks                            | v_vmart_node0001
 0 | nimble-eyed undupability frowsier           | v_vmart_node0002
 0 | thiaminase hieroglypher derogated soilborne | v_vmart_node0002
 0 | aurigraphy crocket stenocranial             | v_vmart_node0002
 0 | Khulna pelmets                              | v_vmart_node0003
 0 | Circean nonrepellence nonnasality           | v_vmart_node0003
 0 | matterate protarsal                         | v_vmart_node0003
(9 rows)

Parser implementation

The following code shows the source of the FileSource class that reads a file from the host file system. The constructor, which is called by FileSourceFactory.prepareUDSources(), gets the absolute path for the file containing the data to be read. The setup() method opens the file and the destroy() method closes it. The process() method reads from the file into a buffer provided by the instance of the DataBuffer class passed to it as a parameter. If the read operation filled the output buffer, it returns OUTPUT_NEEDED. This value tells Vertica to call the method again after the next stage of the load has processed the output buffer. If the read did not fill the output buffer, then process() returns DONE to indicate it has finished processing the data source.

package com.mycompany.UDL;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.RandomAccessFile;

import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.UDSource;
import com.vertica.sdk.UdfException;

public class FileSource extends UDSource {

    private String filename;  // The file for this UDSource to read
    private RandomAccessFile reader;   // handle to read from file


    // The constructor just stores the absolute filename of the file it will
    // read.
    public FileSource(String filename) {
        super();
        this.filename = filename;
    }

    // Called before Vertica starts requesting data from the data source.
    // In this case, setup needs to open the file and save to the reader
    // property.
    @Override
    public void setup(ServerInterface srvInterface ) throws UdfException{
        try {
            reader = new RandomAccessFile(new File(filename), "r");
        } catch (FileNotFoundException e) {
            // In case of any error, throw a UDfException. This will terminate
            // the data load.
             String msg = e.getMessage();
             throw new UdfException(0, msg);
        }
    }

    // Called after data has been loaded. In this case, close the file handle.
    @Override
    public void destroy(ServerInterface srvInterface ) throws UdfException {
        if (reader != null) {
            try {
                reader.close();
            } catch (IOException e) {
                String msg = e.getMessage();
                 throw new UdfException(0, msg);
            }
        }
    }

    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer output)
                                throws UdfException {

        // Read up to the size of the buffer provided in the DataBuffer.buf
        // property. Here we read directly from the file handle into the
        // buffer.
        long offset;
        try {
            offset = reader.read(output.buf,output.offset,
                                 output.buf.length-output.offset);
        } catch (IOException e) {
            // Throw an exception in case of any errors.
            String msg = e.getMessage();
            throw new UdfException(0, msg);
        }

        // Update the number of bytes processed so far by the data buffer.
        output.offset +=offset;

        // See end of data source has been reached, or less data was read
        // than can fit in the buffer
        if(offset == -1 || offset < output.buf.length) {
            // No more data to read.
            return StreamState.DONE;
        }else{
            // Tell Vertica to call again when buffer has been emptied
            return StreamState.OUTPUT_NEEDED;
        }
    }
}

Factory implementation

The following code is a modified version of the example Java UDsource function provided in the Java UDx support package. You can find the full example in /opt/vertica/sdk/examples/JavaUDx/UDLFuctions/com/vertica/JavaLibs/FileSourceFactory.java. Its override of the plan() method verifies that the user supplied the required file parameter. If the user also supplied the optional nodes parameter, this method verifies that the nodes exist in the Vertica cluster. If there is a problem with either parameter, the method throws an exception to return an error to the user. If there are no issues with the parameters, the plan() method stores their values in the plan context object.

package com.mycompany.UDL;

import java.util.ArrayList;
import java.util.Vector;
import com.vertica.sdk.NodeSpecifyingPlanContext;
import com.vertica.sdk.ParamReader;
import com.vertica.sdk.ParamWriter;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.SourceFactory;
import com.vertica.sdk.UDSource;
import com.vertica.sdk.UdfException;

public class FileSourceFactory extends SourceFactory {

    // Called once on the initiator host to do initial setup. Checks
    // parameters and chooses which nodes will do the work.
    @Override
    public void plan(ServerInterface srvInterface,
            NodeSpecifyingPlanContext planCtxt) throws UdfException {

        String nodes; // stores the list of nodes that will load data

        // Get  copy of the parameters the user supplied to the UDSource
        // function call.
        ParamReader args =  srvInterface.getParamReader();

        // A list of nodes that will perform work. This gets saved as part
        // of the plan context.
        ArrayList<String> executionNodes = new ArrayList<String>();

        // First, ensure the user supplied the file parameter
        if (!args.containsParameter("file")) {
            // Withut a file parameter, we cannot continue. Throw an
            // exception that will be caught by the Java UDx framework.
            throw new UdfException(0, "You must supply a file parameter");
        }

        // If the user specified nodes to read the file, parse the
        // comma-separated list and save. Otherwise, assume just the
        // Initiator node has the file to read.
        if (args.containsParameter("nodes")) {
            nodes = args.getString("nodes");

            // Get list of nodes in cluster, to ensure that the node the
            // user specified actually exists. The list of nodes is available
            // from the planCTxt (plan context) object,
            ArrayList<String> clusterNodes = planCtxt.getClusterNodes();

            // Parse the string parameter "nodes" which
            // is a comma-separated list of node names.
            String[] nodeNames = nodes.split(",");

            for (int i = 0; i < nodeNames.length; i++){
                // See if the node the user gave us actually exists
                if(clusterNodes.contains(nodeNames[i]))
                    // Node exists. Add it to list of nodes.
                    executionNodes.add(nodeNames[i]);
                else{
                    // User supplied node that doesn't exist. Throw an
                    // exception so the user is notified.
                    String msg = String.format("Specified node '%s' but no" +
                        " node by that name is available.  Available nodes "
                        + "are \"%s\".",
                        nodeNames[i], clusterNodes.toString());
                    throw new UdfException(0, msg);
                }
            }
        } else {
            // User did not supply a list of node names. Assume the initiator
            // is the only host that will read the file. The srvInterface
            // instance passed to this method has a getter for the current
            // node.
            executionNodes.add(srvInterface.getCurrentNodeName());
        }

        // Set the target node(s) in the plan context
        planCtxt.setTargetNodes(executionNodes);

        // Set parameters for each node reading data that tells it which
        // files it will read. In this simple example, just tell it to
        // read all of the files the user passed in the file parameter
        String files = args.getString("file");

        // Get object to write parameters into the plan context object.
        ParamWriter nodeParams = planCtxt.getWriter();

        // Loop through list of execution nodes, and add a parameter to plan
        // context named for each node performing the work, which tells it the
        // list of files it will process. Each node will look for a
        // parameter named something like "filesForv_vmart_node0002" in its
        // prepareUDSources() method.
        for (int i = 0; i < executionNodes.size(); i++) {
            nodeParams.setString("filesFor" + executionNodes.get(i), files);
        }
    }

    // Called on each host that is reading data from a source. This method
    // returns an array of UDSource objects that process each source.
    @Override
    public ArrayList<UDSource> prepareUDSources(ServerInterface srvInterface,
            NodeSpecifyingPlanContext planCtxt) throws UdfException {

        // An array to hold the UDSource subclasses that we instaniate
        ArrayList<UDSource> retVal = new ArrayList<UDSource>();

        // Get the list of files this node is supposed to process. This was
        // saved by the plan() method in the plancontext
        String myName = srvInterface.getCurrentNodeName();
        ParamReader params = planCtxt.getReader();
        String fileNames = params.getString("filesFor" + myName);

        // Note that you can also be lazy and directly grab the parameters
        // the user passed to the UDSource functon in the COPY statement directly
        // by getting parameters from the ServerInterface object. I.e.:

        //String fileNames = srvInterface.getParamReader().getString("file");

        // Split comma-separated list into a single list.
        String[] fileList = fileNames.split(",");
        for (int i = 0; i < fileList.length; i++){
            // Instantiate a FileSource object (which is a subclass of UDSource)
            // to read each file. The constructor for FileSource takes the
            // file name of the
            retVal.add(new FileSource(fileList[i]));
        }

        // Return the collection of FileSource objects. They will be called,
        // in turn, to read each of the files.
        return retVal;
    }

    // Declares which parameters that this factory accepts.
    @Override
    public void getParameterType(ServerInterface srvInterface,
                                    SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(65000, "file");
        parameterTypes.addVarchar(65000, "nodes");
    }
}

2 - User-defined filter

User-defined filter functions allow you to manipulate data obtained from a source in various ways.

User-defined filter functions allow you to manipulate data obtained from a source in various ways. For example, a filter can:

  • Process a compressed file in a compression format not natively supported by Vertica.

  • Take UTF-16-encoded data and transcode it to UTF-8 encoding.

  • Perform search-and-replace operations on data before it is loaded into Vertica.

You can also process data through multiple filters before it is loaded into Vertica. For instance, you could unzip a file compressed with GZip, convert the content from UTF-16 to UTF-8, and finally search and replace certain text strings.

If you implement a UDFilter, you must also implement a corresponding FilterFactory.

See UDFilter class and FilterFactory class for API details.

2.1 - UDFilter class

The UDFilter class is responsible for reading raw input data from a source and preparing it to be loaded into Vertica or processed by a parser.

The UDFilter class is responsible for reading raw input data from a source and preparing it to be loaded into Vertica or processed by a parser. This preparation may involve decompression, re-encoding, or any other sort of binary manipulation.

A UDFilter is instantiated by a corresponding FilterFactory on each host in the Vertica cluster that is performing filtering for the data source.

UDFilter methods

Your UDFilter subclass must override process() or processWithMetadata():

  • process() reads the raw input stream as one large file. If there are any errors or failures, the entire load fails.
    You can implement process() when the upstream source implements processWithMetadata(), but it might result in parsing errors.

  • processWithMetadata() is useful when the data source has metadata about record boundaries available in some structured format that's separate from the data payload. With this interface, the source emits a record length for each record in addition to the data.

    By implementing processWithMetadata() instead of process() in each phase, you can retain this record length metadata throughout the load stack, which enables a more efficient parse that can recover from errors on a per-message basis, rather than a per-file or per-source basis. KafkaSource and the Kafka parsers (KafkaAvroParser, KafkaJSONParser, and KafkaParser) use this mechanism to support per-Kafka-message rejections when individual Kafka messages are corrupted.

    Using processWithMetadata() with your UDFilter subclass enables you to write an internal filter that integrates the record length metadata from the source into the data stream, producing a single byte stream with boundary information to help parsers extract and process individual messages. KafkaInsertDelimeters and KafkaInsertLengths use this mechanism to insert message boundary information into Kafka data streams.

Optionally, you can override other UDFilter class methods.

Filter execution

The following sections detail the execution sequence each time a user-defined filter is called. The following example overrides the process() method.

Setting Up
COPY calls setup() before the first time it calls process(). Use setup() to perform any necessary setup steps that your filter needs to operate, such as initializing data structures to be used during filtering. Your object might be destroyed and re-created during use, so make sure that your object is restartable.

Filtering Data
COPY calls process() repeatedly during query execution to filter data. The method receives two instances of the DataBuffer class among its parameters, an input and an output buffer. Your implementation should read from the input buffer, manipulate it in some manner (such as decompressing it), and write the result to the output. A one-to-one correlation between the number of bytes your implementation reads and the number it writes might not exist. The process() method should process data until it either runs out of data to read or runs out of space in the output buffer. When one of these conditions occurs, your method should return one of the following values defined by StreamState:

  • OUTPUT_NEEDED if the filter needs more room in its output buffer.

  • INPUT_NEEDED if the filter has run out of input data (but the data source has not yet been fully processed).

  • DONE if the filter has processed all of the data in the data source.

  • KEEP_GOING if the filter cannot proceed for an extended period of time. The method will be called again, so do not block indefinitely. If you do, then you prevent your user from canceling the query.

Before returning, your process() method must set the offset property in each DataBuffer. In the input buffer, set it to the number of bytes that the method successfully read. In the output buffer, set it to the number of bytes the method wrote. Setting these properties allows the next call to process() to resume reading and writing data at the correct points in the buffers.

Your process() method also needs to check the InputState object passed to it to determine if there is more data in the data source. When this object is equal to END_OF_FILE, then the data remaining in the input data is the last data in the data source. Once it has processed all of the remaining data, process() must return DONE.

Tearing Down
COPY calls destroy() after the last time it calls process(). This method frees any resources reserved by the setup() or process() methods. Vertica calls this method after the process() method indicates it has finished filtering all of the data in the data stream.

If there are still data sources that have not yet been processed, Vertica may later call setup() on the object again. On subsequent calls Vertica directs the method to filter the data in a new data stream. Therefore, your destroy() method should leave an object of your UDFilter subclass in a state where the setup() method can prepare it to be reused.

API

The UDFilter API provides the following methods for extension by subclasses:

virtual void setup(ServerInterface &srvInterface);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input, InputState input_state, DataBuffer &output)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &input,
    LengthBuffer &input_lengths, InputState input_state, DataBuffer &output, LengthBuffer &output_lengths)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface);

The UDFilter API provides the following methods for extension by subclasses:

public void setup(ServerInterface srvInterface) throws UdfException;

public abstract StreamState process(ServerInterface srvInterface, DataBuffer input,
                InputState input_state, DataBuffer output)
    throws UdfException;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface) throws UdfException;

The UDFilter API provides the following methods for extension by subclasses:

class PyUDFilter(vertica_sdk.UDFilter):
    def __init__(self):
        pass

    def setup(self, srvInterface):
        pass

    def process(self, srvInterface, inputbuffer, outputbuffer, inputstate):
        # User process data here, and put into outputbuffer.
        return StreamState.DONE

2.2 - FilterFactory class

If you write a filter, you must also write a filter factory to produce filter instances.

If you write a filter, you must also write a filter factory to produce filter instances. To do so, subclass the FilterFactory class.

Your subclass performs the initial validation and planning of the function execution and instantiates UDFilter objects on each host that will be filtering data.

Filter factories are singletons. Your subclass must be stateless, with no fields containing data. The subclass also must not modify any global variables.

FilterFactory methods

The FilterFactory class defines the following methods. Your subclass must override the prepare() method. It may override the other methods.

Setting up

Vertica calls plan() once on the initiator node, to perform the following tasks:

  • Check any parameters that have been passed from the function call in the COPY statement and error messages if there are any issues. You read the parameters by getting a ParamReader object from the instance of ServerInterface passed into your plan() method.

  • Store any information that the individual hosts need in order to filter data in the PlanContext instance passed as a parameter. For example, you could store details of the input format that the filter will read and output the format that the filter should produce. The plan() method runs only on the initiator node, and the prepare() method runs on each host reading from a data source. Therefore, this object is the only means of communication between them.

    You store data in the PlanContext by getting a ParamWriter object from the getWriter() method. You then write parameters by calling methods on the ParamWriter such as setString.

Creating filters

Vertica calls prepare() to create and initialize your filter. It calls this method once on each node that will perform filtering. Vertica automatically selects the best nodes to complete the work based on available resources. You cannot specify the nodes on which the work is done.

Defining parameters

Implement getParameterTypes() to define the names and types of parameters that your filter uses. Vertica uses this information to warn callers about unknown or missing parameters. Vertica ignores unknown parameters and uses default values for missing parameters. While you should define the types and parameters for your function, you are not required to override this method.

API

The FilterFactory API provides the following methods for extension by subclasses:

virtual void plan(ServerInterface &srvInterface, PlanContext &planCtxt);

virtual UDFilter * prepare(ServerInterface &srvInterface, PlanContext &planCtxt)=0;

virtual void getParameterType(ServerInterface &srvInterface, SizedColumnTypes &parameterTypes);

After creating your FilterFactory, you must register it with the RegisterFactory macro.

The FilterFactory API provides the following methods for extension by subclasses:

public void plan(ServerInterface srvInterface, PlanContext planCtxt)
    throws UdfException;

public abstract UDFilter prepare(ServerInterface srvInterface, PlanContext planCtxt)
    throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

The FilterFactory API provides the following methods for extension by subclasses:


class PyFilterFactory(vertica_sdk.SourceFactory):
    def __init__(self):
        pass
    def plan(self):
        pass
    def prepare(self, planContext):
        #User implement the function to create PyUDSources.
        pass

2.3 - Java example: ReplaceCharFilter

The example in this section demonstrates creating a UDFilter that replaces any occurrences of a character in the input stream with another character in the output stream.

The example in this section demonstrates creating a UDFilter that replaces any occurrences of a character in the input stream with another character in the output stream. This example is highly simplified and assumes the input stream is ASCII data.

Always remember that the input and output streams in a UDFilter are actually binary data. If you are performing character transformations using a UDFilter, convert the data stream from a string of bytes into a properly encoded string. For example, your input stream might consist of UTF-8 encoded text. If so, be sure to transform the raw binary being read from the buffer into a UTF string before manipulating it.

Loading and using the example

The example UDFilter has two required parameters. The from_char parameter specifies the character to be replaced, and the to_char parameter specifies the replacement character. Load and use the ReplaceCharFilter UDFilter as follows:

=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaUDlLib.jar'
->LANGUAGE 'JAVA';
CREATE LIBRARY
=> CREATE FILTER ReplaceCharFilter as LANGUAGE 'JAVA'
->name 'com.mycompany.UDL.ReplaceCharFilterFactory' library JavaLib;
CREATE FILTER FUNCTION
=> CREATE TABLE t (text VARCHAR);
CREATE TABLE
=> COPY t FROM STDIN WITH FILTER ReplaceCharFilter(from_char='a', to_char='z');
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> Mary had a little lamb
>> a man, a plan, a canal, Panama
>> \.

=> SELECT * FROM t;
              text
--------------------------------
 Mzry hzd z little lzmb
 z mzn, z plzn, z cznzl, Pznzmz
(2 rows)

=> --Calling the filter with incorrect parameters returns errors
=> COPY t from stdin with filter ReplaceCharFilter();
ERROR 3399:  Failure in UDx RPC call InvokePlanUDL(): Error in User Defined Object [
ReplaceCharFilter], error code: 0
com.vertica.sdk.UdfException: You must supply two parameters to ReplaceChar: 'from_char' and 'to_char'
        at com.vertica.JavaLibs.ReplaceCharFilterFactory.plan(ReplaceCharFilterFactory.java:22)
        at com.vertica.udxfence.UDxExecContext.planUDFilter(UDxExecContext.java:889)
        at com.vertica.udxfence.UDxExecContext.planCurrentUDLType(UDxExecContext.java:865)
        at com.vertica.udxfence.UDxExecContext.planUDL(UDxExecContext.java:821)
        at com.vertica.udxfence.UDxExecContext.run(UDxExecContext.java:242)
        at java.lang.Thread.run(Thread.java:662)

Parser implementation

The ReplaceCharFilter class reads the data stream, replacing each occurrence of a user-specified character with another character.

package com.vertica.JavaLibs;

import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.InputState;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.UDFilter;

public class ReplaceCharFilter extends UDFilter {

    private byte[] fromChar;
    private byte[] toChar;

    public ReplaceCharFilter(String fromChar, String toChar){
        // Stores the from char and to char as byte arrays. This is
        // not a robust method of doing this, but works for this simple
        // example.
        this.fromChar= fromChar.getBytes();
        this.toChar=toChar.getBytes();
    }
    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer input,
            InputState input_state, DataBuffer output) {

        // Check if there is no more input and the input buffer has been completely
        // processed. If so, filtering is done.
        if (input_state == InputState.END_OF_FILE && input.buf.length == 0) {
            return StreamState.DONE;
        }

        // Get current position in the input buffer
        int offset = output.offset;

        // Determine how many bytes to process. This is either until input
        // buffer is exhausted or output buffer is filled
        int limit = Math.min((input.buf.length - input.offset),
                (output.buf.length - output.offset));

        for (int i = input.offset; i < limit; i++) {
            // This example just replaces each instance of from_char
            // with to_char. It does not consider things such as multi-byte
            // UTF-8 characters.
            if (input.buf[i] == fromChar[0]) {
                output.buf[i+offset] = toChar[0];
            } else {
                // Did not find from_char, so copy input to the output
                output.buf[i+offset]=input.buf[i];
            }
        }

        input.offset += limit;
        output.offset += input.offset;

        if (input.buf.length - input.offset < output.buf.length - output.offset) {
            return StreamState.INPUT_NEEDED;
        } else {
            return StreamState.OUTPUT_NEEDED;
        }
    }
}

Factory implementation

ReplaceCharFilterFactory requires two parameters (from_char and to_char). The plan() method verifies that these parameters exist and are single-character strings. The method then stores them in the plan context. The prepare() method gets the parameter values and passes them to the ReplaceCharFilter objects, which it instantiates, to perform the filtering.

package com.vertica.JavaLibs;

import java.util.ArrayList;
import java.util.Vector;

import com.vertica.sdk.FilterFactory;
import com.vertica.sdk.PlanContext;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.UDFilter;
import com.vertica.sdk.UdfException;

public class ReplaceCharFilterFactory extends FilterFactory {

    // Run on the initiator node to perform varification and basic setup.
    @Override
    public void plan(ServerInterface srvInterface,PlanContext planCtxt)
                     throws UdfException {
        ArrayList<String> args =
                          srvInterface.getParamReader().getParamNames();

        // Ensure user supplied two arguments
        if (!(args.contains("from_char") && args.contains("to_char"))) {
            throw new UdfException(0, "You must supply two parameters" +
                        " to ReplaceChar: 'from_char' and 'to_char'");
        }

        // Verify that the from_char is a single character.
        String fromChar = srvInterface.getParamReader().getString("from_char");
        if (fromChar.length() != 1) {
            String message =  String.format("Replacechar expects a single " +
                "character in the 'from_char' parameter. Got length %d",
                fromChar.length());
            throw new UdfException(0, message);
        }

        // Save the from character in the plan context, to be read by
        // prepare() method.
        planCtxt.getWriter().setString("fromChar",fromChar);

        // Ensure to character parameter is a single characater
        String toChar = srvInterface.getParamReader().getString("to_char");
        if (toChar.length() != 1) {
            String message =  String.format("Replacechar expects a single "
                 + "character in the 'to_char' parameter. Got length %d",
                toChar.length());
            throw new UdfException(0, message);
        }
        // Save the to character in the plan data
        planCtxt.getWriter().setString("toChar",toChar);
    }

    // Called on every host that will filter data. Must instantiate the
    // UDFilter subclass.
    @Override
    public UDFilter prepare(ServerInterface srvInterface, PlanContext planCtxt)
                            throws UdfException {
        // Get data stored in the context by the plan() method.
        String fromChar = planCtxt.getWriter().getString("fromChar");
        String toChar = planCtxt.getWriter().getString("toChar");

        // Instantiate a filter object to perform filtering.
        return new ReplaceCharFilter(fromChar, toChar);
    }

    // Describe the parameters accepted by this filter.
    @Override
    public void getParameterType(ServerInterface srvInterface,
             SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(1, "from_char");
        parameterTypes.addVarchar(1, "to_char");
    }
}

2.4 - C++ example: converting encoding

The following example shows how you can convert encoding for a file from one type to another by converting UTF-16 encoded data to UTF-8.

The following example shows how you can convert encoding for a file from one type to another by converting UTF-16 encoded data to UTF-8. You can find this example in the SDK at /opt/vertica/sdk/examples/FilterFunctions/IConverter.cpp.

Filter implementation

class Iconverter : public UDFilter{
private:
    std::string fromEncoding, toEncoding;
    iconv_t cd; // the conversion descriptor opened
    uint converted; // how many characters have been converted
protected:
    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input,
                                InputState input_state, DataBuffer &output)
    {
        char *input_buf = (char *)input.buf + input.offset;
        char *output_buf = (char *)output.buf + output.offset;
        size_t inBytesLeft = input.size - input.offset, outBytesLeft = output.size - output.offset;
        // end of input
        if (input_state == END_OF_FILE && inBytesLeft == 0)
        {
            // Gnu libc iconv doc says, it is good practice to finalize the
            // outbuffer for stateful encodings (by calling with null inbuffer).
            //
            // http://www.gnu.org/software/libc/manual/html_node/Generic-Conversion-Interface.html
            iconv(cd, NULL, NULL, &output_buf, &outBytesLeft);
            // output buffer can be updated by this operation
            output.offset = output.size - outBytesLeft;
            return DONE;
        }
        size_t ret = iconv(cd, &input_buf, &inBytesLeft, &output_buf, &outBytesLeft);
        // if conversion is successful, we ask for more input, as input has not reached EOF.
        StreamState retStatus = INPUT_NEEDED;
        if (ret == (size_t)(-1))
        {
            // seen an error
            switch (errno)
            {
            case E2BIG:
                // input size too big, not a problem, ask for more output.
                retStatus = OUTPUT_NEEDED;
                break;
            case EINVAL:
                // input stops in the middle of a byte sequence, not a problem, ask for more input
                retStatus = input_state == END_OF_FILE ? DONE : INPUT_NEEDED;
                break;
            case EILSEQ:
                // invalid sequence seen, throw
                // TODO: reporting the wrong byte position
                vt_report_error(1, "Invalid byte sequence when doing %u-th conversion", converted);
            case EBADF:
                // something wrong with descriptor, throw
                vt_report_error(0, "Invalid descriptor");
            default:
                vt_report_error(0, "Uncommon Error");
                break;
            }
        }
        else converted += ret;
        // move position pointer
        input.offset = input.size - inBytesLeft;
        output.offset = output.size - outBytesLeft;
        return retStatus;
    }
public:
    Iconverter(const std::string &from, const std::string &to)
    : fromEncoding(from), toEncoding(to), converted(0)
    {
        // note "to encoding" is first argument to iconv...
        cd = iconv_open(to.c_str(), from.c_str());
        if (cd == (iconv_t)(-1))
        {
            // error when creating converters.
            vt_report_error(0, "Error initializing iconv: %m");
        }
    }
    ~Iconverter()
    {
        // free iconv resources;
        iconv_close(cd);
    }
};

Factory implementation

class IconverterFactory : public FilterFactory{
public:
    virtual void plan(ServerInterface &srvInterface,
            PlanContext &planCtxt) {
        std::vector<std::string> args = srvInterface.getParamReader().getParamNames();
        /* Check parameters */
        if (!(args.size() == 0 ||
                (args.size() == 1 && find(args.begin(), args.end(), "from_encoding")
                        != args.end()) || (args.size() == 2
                        && find(args.begin(), args.end(), "from_encoding") != args.end()
                        && find(args.begin(), args.end(), "to_encoding") != args.end()))) {
            vt_report_error(0, "Invalid arguments.  Must specify either no arguments,  or "
                               "'from_encoding' alone, or 'from_encoding' and 'to_encoding'.");
        }
        /* Populate planData */
        // By default, we do UTF16->UTF8, and x->UTF8
        VString from_encoding = planCtxt.getWriter().getStringRef("from_encoding");
        VString to_encoding = planCtxt.getWriter().getStringRef("to_encoding");
        from_encoding.copy("UTF-16");
        to_encoding.copy("UTF-8");
        if (args.size() == 2)
        {
            from_encoding.copy(srvInterface.getParamReader().getStringRef("from_encoding"));
            to_encoding.copy(srvInterface.getParamReader().getStringRef("to_encoding"));
        }
        else if (args.size() == 1)
        {
            from_encoding.copy(srvInterface.getParamReader().getStringRef("from_encoding"));
        }
        if (!from_encoding.length()) {
            vt_report_error(0, "The empty string is not a valid from_encoding value");
        }
        if (!to_encoding.length()) {
            vt_report_error(0, "The empty string is not a valid to_encoding value");
        }
    }
    virtual UDFilter* prepare(ServerInterface &srvInterface,
            PlanContext &planCtxt) {
        return vt_createFuncObj(srvInterface.allocator, Iconverter,
                planCtxt.getReader().getStringRef("from_encoding").str(),
                planCtxt.getReader().getStringRef("to_encoding").str());
    }
    virtual void getParameterType(ServerInterface &srvInterface,
                                  SizedColumnTypes &parameterTypes) {
        parameterTypes.addVarchar(32, "from_encoding");
        parameterTypes.addVarchar(32, "to_encoding");
    }
};
RegisterFactory(IconverterFactory);

3 - User-defined parser

A parser takes a stream of bytes and passes a corresponding sequence of tuples to the Vertica load process.

A parser takes a stream of bytes and passes a corresponding sequence of tuples to the Vertica load process. You can use user-defined parser functions to parse:

  • Data in formats not understood by the Vertica built-in parser.

  • Data that requires more specific control than the built-in parser supplies.

For example, you can load a CSV file using a specific CSV library. See the Vertica SDK for two CSV examples.

COPY supports a single user-defined parser that you can use with a user-defined source and zero or more instances of a user-defined filter. If you implement a UDParser class, you must also implement a corresponding ParserFactory.

Sometimes, you can improve the performance of your parser by adding a chunker. A chunker divides up the input and uses multiple threads to parse it. Chunkers are available only in the C++ API. For details, see Cooperative parse and UDChunker class. Under special circumstances you can further improve performance by using apportioned load, an approach where multiple Vertica nodes parse the input.

3.1 - UDParser class

You can subclass the UDParser class when you need to parse data that is in a format that the COPY statement's native parser cannot handle.

You can subclass the UDParser class when you need to parse data that is in a format that the COPY statement's native parser cannot handle.

During parser execution, Vertica always calls three methods: setup(), process(), and destroy(). It might also call getRejectedRecord().

UDParser constructor

The UDParser class performs important initialization required by all subclasses, including initializing the StreamWriter object used by the parser. Therefore, your constructor must call super().

UDParser methods

Your UDParser subclass must override process() or processWithMetadata():

  • process() reads the raw input stream as one large file. If there are any errors or failures, the entire load fails. You can implement process() with a source or filter that implements processWithMetadata(), but it might result in parsing errors.
    You can implement process() when the upstream source or filter implements processWithMetadata(), but it might result in parsing errors.

  • processWithMetadata() is useful when the data source has metadata about record boundaries available in some structured format that's separate from the data payload. With this interface, the source emits a record length for each record in addition to the data.

    By implementing processWithMetadata() instead of process() in each phase, you can retain this record length metadata throughout the load stack, which enables a more efficient parse that can recover from errors on a per-message basis, rather than a per-file or per-source basis. KafkaSource and Kafka parsers (KafkaAvroParser, KafkaJSONParser, and KafkaParser) use this mechanism to support per-Kafka-message rejections when individual Kafka messages are corrupted.

Additionally, you must override getRejectedRecord() to return information about rejected records.

Optionally, you can override the other UDParser class methods.

Parser execution

The following sections detail the execution sequence each time a user-defined parser is called. The following example overrides the process() method.

Setting up

COPY calls setup() before the first time it calls process(). Use setup() to perform any initial setup tasks that your parser needs to parse data. This setup includes retrieving parameters from the class context structure or initializing data structures for use during filtering. Vertica calls this method before calling the process() method for the first time. Your object might be destroyed and re-created during use, so make sure that your object is restartable.

Parsing

COPY calls process() repeatedly during query execution. Vertica passes this method a buffer of data to parse into columns and rows and one of the following input states defined by InputState:

  • OK: currently at the start of or in the middle of a stream

  • END_OF_FILE: no further data is available.

  • END_OF_CHUNK: the current data ends on a record boundary and the parser should consume all of it before returning. This input state only occurs when using a chunker.

  • START_OF_PORTION: the input does not start at the beginning of a source. The parser should find the first end-of-record mark. This input state only occurs when using apportioned load.You can use the getPortion() method to access the offset and size of the portion.

  • END_OF_PORTION: the source has reached the end of its portion. The parser should finish processing the last record it started and advance no further. This input state only occurs when using apportioned load.

The parser must reject any data that it cannot parse, so that Vertica can report the rejection and write the rejected data to files.

The process() method must parse as much data as it can from the input buffer. The buffer might not end on a row boundary. Therefore, it might have to stop parsing in the middle of a row of input and ask for more data. The input can contain null bytes, if the source file contains them, and is not automatically null-terminated.

A parser has an associated StreamWriter object, which performs the actual writing of the data. When your parser extracts a column value, it uses one of the type-specific methods on StreamWriter to write that value to the output stream. See Writing Data for more information about these methods.

A single call to process() might write several rows of data. When your parser finishes processing a row of data, it must call next() on its StreamWriter to advance the output stream to a new row. (Usually a parser finishes processing a row because it encounters an end-of-row marker.)

When your process() method reaches the end of the buffer, it tells Vertica its current state by returning one of the following values defined by StreamState:

  • INPUT_NEEDED: the parser has reached the end of the buffer and needs more data to parse.

  • DONE: the parser has reached the end of the input data stream.

  • REJECT: the parser has rejected the last row of data it read (see Rejecting Rows).

Tearing down

COPY calls destroy() after the last time that process() is called. It frees any resources reserved by the setup() or process() method.

Vertica calls this method after the process() method indicates it has completed parsing the data source. However, sometimes data sources that have not yet been processed might remain. In such cases, Vertica might later call setup() on the object again and have it parse the data in a new data stream. Therefore, write your destroy() method so that it leaves an instance of your UDParser subclass in a state where setup() can be safely called again.

Reporting rejections

If process() rejects a row, Vertica calls getRejectedRecord() to report it. Usually, this method returns an instance of the RejectedRecord class with details of the rejected row.

Writing data

A parser has an associated StreamWriter object, which you access by calling getStreamWriter(). In your process() implementation, use the setType() methods on the StreamWriter object to write values in a row to specific column indexes. Verify that the data types you write match the data types expected by the schema.

The following example shows how you can write a value of type long to the fourth column (index 3) in the current row:

StreamWriter writer = getStreamWriter();
...
writer.setLongValue(3, 98.6);

StreamWriter provides methods for all the basic types, such as setBooleanValue(), setStringValue(), and so on. See the API documentation for a complete list of StreamWriter methods, including options that take primitive types or explicitly set entries to null.

Rejecting rows

If your parser finds data it cannot parse, it should reject the row by:

  1. Saving details about the rejected row data and the reason for the rejection. These pieces of information can be directly stored in a RejectedRecord object, or in fields on your UDParser subclass, until they are needed.

  2. Updating the row's position in the input buffer by updating input.offset so it can resume parsing with the next row.

  3. Signaling that it has rejected a row by returning with the value StreamState.REJECT.

  4. Returning an instance of the RejectedRecord class with the details about the rejected row.

Breaking up large loads

Vertica provides two ways to break up large loads. Apportioned load allows you to distribute a load among several database nodes. Cooperative parse (C++ only) allows you to distribute a load among several threads on one node.

API

The UDParser API provides the following methods for extension by subclasses:

virtual void setup(ServerInterface &srvInterface, SizedColumnTypes &returnType);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input, InputState input_state)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &input,
                            LengthBuffer &input_lengths, InputState input_state)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface, SizedColumnTypes &returnType);

virtual RejectedRecord getRejectedRecord();

The UDParser API provides the following methods for extension by subclasses:

public void setup(ServerInterface srvInterface, SizedColumnTypes returnType)
    throws UdfException;

public abstract StreamState process(ServerInterface srvInterface,
                DataBuffer input, InputState input_state)
    throws UdfException, DestroyInvocation;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface, SizedColumnTypes returnType)
    throws UdfException;

public RejectedRecord getRejectedRecord() throws UdfException;

A UDParser uses a StreamWriter to write its output. StreamWriter provides methods for all the basic types, such as setBooleanValue(), setStringValue(), and so on. In the Java API this class also provides the setValue() method, which automatically sets the data type.

The methods described so far write single column values. StreamWriter also provides a method to write a complete row from a map. The setRowFromMap() method takes a map of column names and values and writes all the values into their corresponding columns. This method does not define new columns but instead writes values only to existing columns. The JsonParser example uses this method to write arbitrary JSON input. (See Java example: JSON parser.)

setRowsFromMap() also populates any VMap ('raw') column of Flex Tables (see Flex tables) with the entire provided map. For most cases, setRowsFromMap() is the appropriate way to populate a Flex Table. However, you can also generate a VMap value into a specified column using setVMap(), similar to other setValue() methods.

The setRowFromMap() method automatically coerces the input values into the types defined for those columns using an associated TypeCoercion. In most cases, using the default implementation (StandardTypeCoercion) is appropriate.

TypeCoercion uses policies to govern its behavior. For example, the FAIL_INVALID_INPUT_VALUE policy means invalid input is treated as an error instead of using a null value. Errors are caught and handled as rejections (see "Rejecting Rows" in User-defined parser). Policies also govern whether input that is too long is truncated. Use the setPolicy() method on the parser's TypeCoercion to set policies. See the API documentation for supported values.

You might need to customize type coercion beyond setting these policies. To do so, subclass one of the provided implementations of TypeCoercion and override the asType() methods. Such customization could be necessary if your parser reads objects that come from a third-party library. A parser handling geo-coordinates, for example, might override asLong to translate inputs like "40.4397N" into numbers. See the Vertica API documentation for a list of implementations.

The UDParser API provides the following methods for extension by subclasses:


class PyUDParser(vertica_sdk.UDSource):
    def __init__(self):
        pass
    def setup(srvInterface, returnType):
        pass
    def process(self, srvInterface, inputbuffer, inputstate, streamwriter):
        # User implement process function.
        # User reads data from inputbuffer and parse the data.
        # Rows are emitted via the streamwriter argument
        return StreamState.DONE

In Python, the process() method requires both an input buffer and an output buffer (see InputBuffer and OutputBuffer APIs). The input buffer represents the source of the information that you want to parse. The output buffer delivers the filtered information to Vertica.

In the event the filter rejects a record, use the method REJECT() to identify the rejected data and the reason for the rejection.

3.2 - UDChunker class

You can subclass the UDChunker class to allow your parser to support Cooperative Parse.

You can subclass the UDChunker class to allow your parser to support Cooperative parse. This class is available only in the C++ API.

Fundamentally, a UDChunker is a very simplistic parser. Like UDParser, it has the following three methods: setup(), process(), and destroy(). You must override process(); you may override the others. This class has one additional method, alignPortion(), which you must implement if you want to enable Apportioned load for your UDChunker.

Setting up and tearing down

As with UDParser, you can define initialization and cleanup code for your chunker. Vertica calls setup() before the first call to process() and destroy() after the last call to process(). Your object might be reused among multiple load sources, so make sure that setup() completely initializes all fields.

Chunking

Vertica calls process() to divide an input into chunks that can be parsed independently. The method takes an input buffer and an indicator of the input state:

  • OK: the input buffer begins at the start of or in the middle of a stream.

  • END_OF_FILE: no further data is available.

  • END_OF_PORTION: the source has reached the end of its portion. This state occurs only when using apportioned load.

If the input state is END_OF_FILE, the chunker should set the input.offset marker to input.size and return DONE. Returning INPUT_NEEDED is an error.

If the input state is OK, the chunker should read data from the input buffer and find record boundaries. If it finds the end of at least one record, it should align the input.offset marker with the byte after the end of the last record in the buffer and return CHUNK_ALIGNED. For example, if the input is "abc~def" and "~" is a record terminator, this method should set input.offset to 4, the position of "d". If process() reaches the end of the input without finding a record boundary, it should return INPUT_NEEDED.

You can divide the input into smaller chunks, but consuming all available records in the input can have better performance. For example, a chunker could scan backwards from the end of the input to find a record terminator, which might be the last of many records in the input, and return it all as one chunk without scanning through the rest of the input.

If the input state is END_OF_PORTION, the chunker should behave as it does for an input state of OK, except that it should also set a flag. When called again, it should find the first record in the next portion and align the chunk to that record.

The input data can contain null bytes, if the source file contains them. The input argument is not automatically null-terminated.

The process() method must not block indefinitely. If this method cannot proceed for an extended period of time, it should return KEEP_GOING. Failing to return KEEP_GOING has several consequences, such as preventing your user from being able to cancel the query.

See C++ example: delimited parser and chunker for an example of the process() method using chunking.

Aligning portions

If your chunker supports apportioned load, implement the alignPortion() method. Vertica calls this method one or more times, before calling process(), to align the input offset with the beginning of the first complete chunk in the portion. The method takes an input buffer and an indicator of the input state:

  • START_OF_PORTION: the beginning of the buffer corresponds to the start of the portion. You can use the getPortion() method to access the offset and size of the portion.

  • OK: the input buffer is in the middle of a portion.

  • END_OF_PORTION: the end of the buffer corresponds to the end of the portion or beyond the end of a portion.

  • END_OF_FILE: no further data is available.

The method should scan from the beginning of the buffer to the start of the first complete record. It should set input.offset to this position and return one of the following values:

  • DONE, if it found a chunk. input.offset is the first byte of the chunk.

  • INPUT_NEEDED, if the input buffer does not contain the start of any chunk. It is an error to return this from an input state of END_OF_FILE.

  • REJECT, if the portion (not buffer) does not contain the start of any chunk.

API

The UDChunker API provides the following methods for extension by subclasses:

virtual void setup(ServerInterface &srvInterface,
            SizedColumnTypes &returnType);

virtual StreamState alignPortion(ServerInterface &srvInterface,
            DataBuffer &input, InputState state);

virtual StreamState process(ServerInterface &srvInterface,
            DataBuffer &input, InputState input_state)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface,
            SizedColumnTypes &returnType);

3.3 - ParserFactory class

If you write a parser, you must also write a factory to produce parser instances.

If you write a parser, you must also write a factory to produce parser instances. To do so, subclass the ParserFactory class.

Parser factories are singletons. Your subclass must be stateless, with no fields containing data. Your subclass also must not modify any global variables.

The ParserFactory class defines the following methods. Your subclass must override the prepare() method. It may override the other methods.

Setting up

Vertica calls plan() once on the initiator node to perform the following tasks:

  • Check any parameters that have been passed from the function call in the COPY statement and error messages if there are any issues. You read the parameters by getting a ParamReader object from the instance of ServerInterface passed into your plan() method.

  • Store any information that the individual hosts need in order to parse the data. For example, you could store parameters in the PlanContext instance passed in through the planCtxt parameter. The plan() method runs only on the initiator node, and the prepareUDSources() method runs on each host reading from a data source. Therefore, this object is the only means of communication between them.

    You store data in the PlanContext by getting a ParamWriter object from the getWriter() method. You then write parameters by calling methods on the ParamWriter such as setString.

Creating parsers

Vertica calls prepare() on each node to create and initialize your parser, using data stored by the plan() method.

Defining parameters

Implement getParameterTypes() to define the names and types of parameters that your parser uses. Vertica uses this information to warn callers about unknown or missing parameters. Vertica ignores unknown parameters and uses default values for missing parameters. While you should define the types and parameters for your function, you are not required to override this method.

Defining parser outputs

Implement getParserReturnType() to define the data types of the table columns that the parser outputs. If applicable, getParserReturnType() also defines the size, precision, or scale of the data types. Usually, this method reads data types of the output table from the argType and perColumnParamReader arguments and verifies that it can output the appropriate data types. If getParserReturnType() is prepared to output the data types, it calls methods on the SizedColumnTypes object passed in the returnType argument. In addition to the data type of the output column, your method should also specify any additional information about the column's data type:

  • For binary and string data types (such as CHAR, VARCHAR, and LONG VARBINARY), specify its maximum length.

  • For NUMERIC types, specify its precision and scale.

  • For Time/Timestamp types (with or without time zone), specify its precision (-1 means unspecified).

  • For ARRAY types, specify the maximum number of elements.

  • For all other types, no length or precision specification is required.

Supporting cooperative parse

To support Cooperative parse, implement prepareChunker() and return an instance of your UDChunker subclass. If isChunkerApportionable() returns true, then it is an error for this method to return null.

Cooperative parse is currently supported only in the C++ API.

Supporting apportioned load

To support Apportioned load, your parser, chunker, or both must support apportioning. To indicate that the parser can apportion a load, implement isParserApportionable() and return true. To indicate that the chunker can apportion a load, implement isChunkerApportionable() and return true.

The isChunkerApportionable() method takes a ServerInterface as an argument, so you have access to the parameters supplied in the COPY statement. You might need this information if the user can specify a record delimiter, for example. Return true from this method if and only if the factory can create a chunker for this input.

API

The ParserFactory API provides the following methods for extension by subclasses:

virtual void plan(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader, PlanContext &planCtxt);

virtual UDParser * prepare(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt, const SizedColumnTypes &returnType)=0;

virtual void getParameterType(ServerInterface &srvInterface, SizedColumnTypes &parameterTypes);

virtual void getParserReturnType(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt, const SizedColumnTypes &argTypes,
            SizedColumnTypes &returnType);

virtual bool isParserApportionable();

// C++ API only:
virtual bool isChunkerApportionable(ServerInterface &srvInterface);

virtual UDChunker * prepareChunker(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt, const SizedColumnTypes &returnType);

If you are using Apportioned load to divide a single input into multiple load streams, implement isParserApportionable() and/or isChunkerApportionable() and return true. Returning true from these methods does not guarantee that Verticawill apportion the load. However, returning false from both indicates that it will not try to do so.

If you are using Cooperative parse, implement prepareChunker() and return an instance of your UDChunker subclass. Cooperative parse is supported only for the C++ API.

Vertica calls the prepareChunker() method only for unfenced functions. This method is not available when you use the function in fenced mode.

If you want your chunker to be available for apportioned load, implement isChunkerApportionable() and return true.

After creating your ParserFactory, you must register it with the RegisterFactory macro.

The ParserFactory API provides the following methods for extension by subclasses:

public void plan(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader, PlanContext planCtxt)
    throws UdfException;

public abstract UDParser prepare(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt, SizedColumnTypes returnType)
    throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

public void getParserReturnType(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt, SizedColumnTypes argTypes, SizedColumnTypes returnType)
    throws UdfException;

The ParserFactory API provides the following methods for extension by subclasses:

class PyParserFactory(vertica_sdk.SourceFactory):
    def __init__(self):
        pass
    def plan(self):
        pass
    def prepareUDSources(self, srvInterface):
        # User implement the function to create PyUDParser.
        pass

3.4 - C++ example: BasicIntegerParser

The BasicIntegerParser example parses a string of integers separated by non-numeric characters.

The BasicIntegerParser example parses a string of integers separated by non-numeric characters. For a version of this parser that uses continuous load, see C++ example: ContinuousIntegerParser.

Loading and using the example

Load and use the BasicIntegerParser example as follows.

=> CREATE LIBRARY BasicIntegerParserLib AS '/home/dbadmin/BIP.so';

=> CREATE PARSER BasicIntegerParser AS
LANGUAGE 'C++' NAME 'BasicIntegerParserFactory' LIBRARY BasicIntegerParserLib;

=> CREATE TABLE t (i integer);

=> COPY t FROM stdin WITH PARSER BasicIntegerParser();
0
1
2
3
4
5
\.

Implementation

The BasicIntegerParser class implements only the process() method from the API. (It also implements a helper method for type conversion.) This method processes each line of input, looking for numbers on each line. When it advances to a new line it moves the input.offset marker and checks the input state. It then writes the output.

    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input,
                InputState input_state) {
        // WARNING: This implementation is not trying for efficiency.
        // It is trying for simplicity, for demonstration purposes.

        size_t start = input.offset;
        const size_t end = input.size;

        do {
            bool found_newline = false;
            size_t numEnd = start;
            for (; numEnd < end; numEnd++) {
                if (input.buf[numEnd] < '0' || input.buf[numEnd] > '9') {
                    found_newline = true;
                    break;
                }
            }

            if (!found_newline) {
                input.offset = start;
                if (input_state == END_OF_FILE) {
                    // If we're at end-of-file,
                    // emit the last integer (if any) and return DONE.
                    if (start != end) {
                        writer->setInt(0, strToInt(input.buf + start, input.buf + numEnd));
                        writer->next();
                    }
                    return DONE;
                } else {
                    // Otherwise, we need more data.
                    return INPUT_NEEDED;
                }
            }

            writer->setInt(0, strToInt(input.buf + start, input.buf + numEnd));
            writer->next();

            start = numEnd + 1;
        } while (true);
    }
};

In the factory, the plan() method is a no-op; there are no parameters to check. The prepare() method instantiates the parser using the macro provided by the SDK:

    virtual UDParser* prepare(ServerInterface &srvInterface,
            PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt,
            const SizedColumnTypes &returnType) {

        return vt_createFuncObject<BasicIntegerParser>(srvInterface.allocator);
    }

The getParserReturnType() method declares the single output:

    virtual void getParserReturnType(ServerInterface &srvInterface,
            PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt,
            const SizedColumnTypes &argTypes,
            SizedColumnTypes &returnType) {
        // We only and always have a single integer column
        returnType.addInt(argTypes.getColumnName(0));
    }

As for all UDxs written in C++, the example ends by registering its factory:

RegisterFactory(BasicIntegerParserFactory);

3.5 - C++ example: ContinuousIntegerParser

The ContinuousIntegerParser example is a variation of BasicIntegerParser.

The ContinuousIntegerParser example is a variation of BasicIntegerParser. Both examples parse integers from input strings. ContinuousIntegerParser uses Continuous load to read data.

Loading and using the example

Load the ContinuousIntegerParser example as follows.

=> CREATE LIBRARY ContinuousIntegerParserLib AS '/home/dbadmin/CIP.so';

=> CREATE PARSER ContinuousIntegerParser AS
LANGUAGE 'C++' NAME 'ContinuousIntegerParserFactory'
LIBRARY ContinuousIntegerParserLib;

Use it in the same way that you use BasicIntegerParser. See C++ example: BasicIntegerParser.

Implementation

ContinuousIntegerParser is a subclass of ContinuousUDParser. Subclasses of ContinuousUDParser place the processing logic in the run() method.

    virtual void run() {

        // This parser assumes a single-column input, and
        // a stream of ASCII integers split by non-numeric characters.
        size_t pos = 0;
        size_t reserved = cr.reserve(pos+1);
        while (!cr.isEof() || reserved == pos + 1) {
            while (reserved == pos + 1 && isdigit(*ptr(pos))) {
                pos++;
                reserved = cr.reserve(pos + 1);
            }

            std::string st(ptr(), pos);
            writer->setInt(0, strToInt(st));
            writer->next();

            while (reserved == pos + 1 && !isdigit(*ptr(pos))) {
                pos++;
                reserved = cr.reserve(pos + 1);
            }
            cr.seek(pos);
            pos = 0;
            reserved = cr.reserve(pos + 1);
        }
    }
};

For a more complex example of a ContinuousUDParser, see ExampleDelimitedParser in the examples. (See Downloading and running UDx example code.) ExampleDelimitedParser uses a chunker; see C++ example: delimited parser and chunker.

3.6 - Java example: numeric text

This NumericTextParser example parses integer values spelled out in words rather than digits (for example "one two three" for one-hundred twenty three).

This NumericTextParser example parses integer values spelled out in words rather than digits (for example "one two three" for one-hundred twenty three). The parser:

  • Accepts a single parameter to set the character that separates columns in a row of data. The separator defaults to the pipe (|) character.

  • Ignores extra spaces and the capitalization of the words used to spell out the digits.

  • Recognizes the digits using the following words: zero, one, two, three, four, five, six, seven, eight, nine.

  • Assumes that the words spelling out an integer are separated by at least one space.

  • Rejects any row of data that cannot be completely parsed into integers.

  • Generates an error, if the output table has a non-integer column.

Loading and using the example

Load and use the parser as follows:

=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaLib.jar' LANGUAGE 'JAVA';
CREATE LIBRARY

=> CREATE PARSER NumericTextParser AS LANGUAGE 'java'
->    NAME 'com.myCompany.UDParser.NumericTextParserFactory'
->    LIBRARY JavaLib;
CREATE PARSER FUNCTION
=> CREATE TABLE t (i INTEGER);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One
>> Two
>> One Two Three
>> \.
=> SELECT * FROM t ORDER BY i;
  i
-----
   1
   2
 123
(3 rows)

=> DROP TABLE t;
DROP TABLE
=> -- Parse multi-column input
=> CREATE TABLE t (i INTEGER, j INTEGER);
CREATE TABLE
=> COPY t FROM stdin WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One | Two
>> Two | Three
>> One Two Three | four Five Six
>> \.
=> SELECT * FROM t ORDER BY i;
  i  |  j
-----+-----
   1 |   2
   2 |   3
 123 | 456
(3 rows)

=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> -- Use alternate separator character
=> COPY t FROM STDIN WITH PARSER NumericTextParser(separator='*');
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> Five * Six
>> seven * eight
>> nine * one zero
>> \.
=> SELECT * FROM t ORDER BY i;
 i | j
---+----
 5 |  6
 7 |  8
 9 | 10
(3 rows)

=> TRUNCATE TABLE t;
TRUNCATE TABLE

=> -- Rows containing data that does not parse into digits is rejected.
=> DROP TABLE t;
DROP TABLE
=> CREATE TABLE t (i INTEGER);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One Zero Zero
>> Two Zero Zero
>> Three Zed Zed
>> Four Zero Zero
>> Five Zed Zed
>> \.
SELECT * FROM t ORDER BY i;
  i
-----
 100
 200
 400
(3 rows)

=> -- Generate an error by trying to copy into a table with a non-integer column
=> DROP TABLE t;
DROP TABLE
=> CREATE TABLE t (i INTEGER, j VARCHAR);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
vsql:UDParse.sql:94: ERROR 3399:  Failure in UDx RPC call
InvokeGetReturnTypeParser(): Error in User Defined Object [NumericTextParser],
error code: 0
com.vertica.sdk.UdfException: Column 2 of output table is not an Int
        at com.myCompany.UDParser.NumericTextParserFactory.getParserReturnType
        (NumericTextParserFactory.java:70)
        at com.vertica.udxfence.UDxExecContext.getReturnTypeParser(
        UDxExecContext.java:1464)
        at com.vertica.udxfence.UDxExecContext.getReturnTypeParser(
        UDxExecContext.java:768)
        at com.vertica.udxfence.UDxExecContext.run(UDxExecContext.java:236)
        at java.lang.Thread.run(Thread.java:662)

Parser implementation

The following code implements the parser.

package com.myCompany.UDParser;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.DestroyInvocation;
import com.vertica.sdk.RejectedRecord;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.InputState;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.StreamWriter;
import com.vertica.sdk.UDParser;
import com.vertica.sdk.UdfException;

public class NumericTextParser extends UDParser {

    private String separator; // Holds column separator character

    // List of strings that we accept as digits.
    private List<String> numbers = Arrays.asList("zero", "one",
            "two", "three", "four", "five", "six", "seven",
            "eight", "nine");

    // Hold information about the last rejected row.
    private String rejectedReason;
    private String rejectedRow;

    // Constructor gets the separator character from the Factory's prepare()
    // method.
    public NumericTextParser(String sepparam) {
        super();
        this.separator = sepparam;
    }

    // Called to perform the actual work of parsing. Gets a buffer of bytes
    // to turn into tuples.
    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer input,
            InputState input_state) throws UdfException, DestroyInvocation {

        int i=input.offset; // Current position in the input buffer
        // Flag to indicate whether we just found the end of a row.
        boolean lastCharNewline = false;
        // Buffer to hold the row of data being read.
        StringBuffer line = new StringBuffer();

        //Continue reading until end of buffer.
        for(; i < input.buf.length; i++){
            // Loop through input until we find a linebreak: marks end of row
            char inchar = (char) input.buf[i];
            // Note that this isn't a robust way to find rows. It should
            // accept a user-defined row separator. Also, the following
            // assumes ASCII line break metheods, which isn't a good idea
            // in the UTF world. But it is good enough for this simple example.
            if (inchar != '\n' && inchar != '\r') {
                // Keep adding to a line buffer until a full row of data is read
                line.append(inchar);
                lastCharNewline = false; // Last character not a new line
            } else {
                // Found a line break. Process the row.
                lastCharNewline = true; // indicate we got a complete row
                // Update the position in the input buffer. This is updated
                // whether the row is successfully processed or not.
                input.offset = i+1;
                // Call procesRow to extract values and write tuples to the
                // output. Returns false if there was an error.
                if (!processRow(line)) {
                    // Processing row failed. Save bad row to rejectedRow field
                    // and return to caller indicating a rejected row.
                    rejectedRow = line.toString();
                    // Update position where we processed the data.
                    return StreamState.REJECT;
                }
                line.delete(0, line.length()); // clear row buffer
            }
        }

        // At this point, process() has finished processing the input buffer.
        // There are two possibilities: need to get more data
        // from the input stream to finish processing, or there is
        // no more data to process. If at the end of the input stream and
        // the row was not terminated by a linefeed, it may need
        // to process the last row.

        if (input_state == InputState.END_OF_FILE && lastCharNewline) {
            // End of input and it ended on a newline. Nothing more to do
            return StreamState.DONE;
        } else if (input_state == InputState.END_OF_FILE && !lastCharNewline) {
            // At end of input stream but didn't get a final newline. Need to
            // process the final row that was read in, then exit for good.
            if (line.length() == 0) {
                // Nothing to process. Done parsing.
                return StreamState.DONE;
            }
            // Need to parse the last row, not terminated by a linefeed. This
            // can occur if the file being read didn't have a final line break.
            if (processRow(line)) {
                return StreamState.DONE;
            } else {
                // Processing last row failed. Save bad row to rejectedRow field
                // and return to caller indicating a rejected row.
                rejectedRow = line.toString();
                // Tell Vertica the entire buffer was processed so it won't
                // call again to have the line processed.
                input.offset = input.buf.length;
                return StreamState.REJECT;
            }
        } else {
            // Stream is not fully read, so tell Vertica to send more. If
            // process() didn't get a complete row before it hit the end of the
            // input buffer, it will end up re-processing that segment again
            // when more data is added to the buffer.
            return StreamState.INPUT_NEEDED;
        }
    }

    // Breaks a row into columns, then parses the content of the
    // columns. Returns false if there was an error parsing the
    // row, in which case it sets the rejected row to the input
    // line. Returns true if the row was successfully read.
    private boolean processRow(StringBuffer line)
                                throws UdfException, DestroyInvocation {
        String[] columns = line.toString().split(Pattern.quote(separator));
        // Loop through the columns, decoding their contents
        for (int col = 0; col < columns.length; col++) {
            // Call decodeColumn to extract value from this column
            Integer colval = decodeColumn(columns[col]);
            if (colval == null) {
                // Could not parse one of the columns. Indicate row should
                // be rejected.
                return false;
            }
            // Column parsed OK. Write it to the output. writer is a field
            // provided by the parent class. Since this parser only accepts
            // integers, there is no need to verify that data type of the parsed
            // data matches the data type of the column being written. In your
            // UDParsers, you may want to perform this verification.
            writer.setLong(col,colval);
        }
        // Done with the row of data. Advance output to next row.

        // Note that this example does not verify that all of the output columns
        // have values assigned to them. If there are missing values at the
        // end of a row, they get automatically get assigned a default value
        // (0 for integers). This isn't a robust solution. Your UDParser
        // should perform checks here to handle this situation and set values
        // (such as null) when appropriate.
        writer.next();
        return true; // Successfully processed the row.
    }

    // Gets a string with text numerals, i.e. "One Two Five Seven" and turns
    // it into an integer value, i.e. 1257. Returns null if the string could not
    // be parsed completely into numbers.
    private Integer decodeColumn(String text) {
        int value = 0; // Hold the value being parsed.

        // Split string into individual words. Eat extra spaces.
        String[] words = text.toLowerCase().trim().split("\\s+");

        // Loop through the words, matching them against the list of
        // digit strings.
        for (int i = 0; i < words.length; i++) {
            if (numbers.contains(words[i])) {
                // Matched a digit. Add the it to the value.
                int digit = numbers.indexOf(words[i]);
                value = (value * 10) + digit;
            } else {
                // The string didn't match one of the accepted string values
                // for digits. Need to reject the row. Set the rejected
                // reason string here so it can be incorporated into the
                // rejected reason object.
                //
                // Note that this example does not handle null column values.
                // In most cases, you want to differentiate between an
                // unparseable column value and a missing piece of input
                // data. This example just rejects the row if there is a missing
                // column value.
                rejectedReason = String.format(
                        "Could not parse '%s' into a digit",words[i]);
                return null;
            }
        }
        return value;
    }

    // Vertica calls this method if the parser rejected a row of data
    // to find out what went wrong and add to the proper logs. Just gathers
    // information stored in fields and returns it in an object.
    @Override
    public RejectedRecord getRejectedRecord() throws UdfException {
        return new RejectedRecord(rejectedReason,rejectedRow.toCharArray(),
                rejectedRow.length(), "\n");
    }
}

ParserFactory implementation

The following code implements the parser factory.

NumericTextParser accepts a single optional parameter named separator. This parameter is defined in the getParameterType() method, and the plan() method stores its value. NumericTextParser outputs only integer values. Therefore, if the output table contains a column whose data type is not integer, the getParserReturnType() method throws an exception.

package com.myCompany.UDParser;

import java.util.regex.Pattern;

import com.vertica.sdk.ParamReader;
import com.vertica.sdk.ParamWriter;
import com.vertica.sdk.ParserFactory;
import com.vertica.sdk.PerColumnParamReader;
import com.vertica.sdk.PlanContext;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.UDParser;
import com.vertica.sdk.UdfException;
import com.vertica.sdk.VerticaType;

public class NumericTextParserFactory extends ParserFactory {

    // Called once on the initiator host to check the parameters and set up the
    // context data that hosts performing processing will need later.
    @Override
    public void plan(ServerInterface srvInterface,
                PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt) {

        String separator = "|"; // assume separator is pipe character

        // See if a parameter was given for column separator
        ParamReader args = srvInterface.getParamReader();
        if (args.containsParameter("separator")) {
            separator = args.getString("separator");
            if (separator.length() > 1) {
                throw new UdfException(0,
                        "Separator parameter must be a single character");
            }
            if (Pattern.quote(separator).matches("[a-zA-Z]")) {
                throw new UdfException(0,
                        "Separator parameter cannot be a letter");
            }
        }

        // Save separator character in the Plan Data
        ParamWriter context = planCtxt.getWriter();
        context.setString("separator", separator);
    }

    // Define the data types of the output table that the parser will return.
    // Mainly, this just ensures that all of the columns in the table which
    // is the target of the data load are integer.
    @Override
    public void getParserReturnType(ServerInterface srvInterface,
                PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt,
                SizedColumnTypes argTypes,
                SizedColumnTypes returnType) {

        // Get access to the output table's columns
        for (int i = 0; i < argTypes.getColumnCount(); i++ ) {
            if (argTypes.getColumnType(i).isInt()) {
                // Column is integer... add it to the output
                 returnType.addInt(argTypes.getColumnName(i));
            } else {
                // Column isn't an int, so throw an exception.
                // Technically, not necessary since the
                // UDx framework will automatically error out when it sees a
                // Discrepancy between the type in the target table and the
                // types declared by this method. Throwing this exception will
                // provide a clearer error message to the user.
                String message = String.format(
                    "Column %d of output table is not an Int", i + 1);
                throw new UdfException(0, message);
            }
        }
    }

    // Instantiate the UDParser subclass named NumericTextParser. Passes the
    // separator characetr as a paramter to the constructor.
    @Override
    public UDParser prepare(ServerInterface srvInterface,
            PerColumnParamReader perColumnParamReader, PlanContext planCtxt,
            SizedColumnTypes returnType) throws UdfException {
        // Get the separator character from the context
        String separator = planCtxt.getReader().getString("separator");
        return new NumericTextParser(separator);
    }

    // Describe the parameters accepted by this parser.
    @Override
    public void getParameterType(ServerInterface srvInterface,
             SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(1, "separator");
    }
}

3.7 - Java example: JSON parser

The JSON Parser consumes a stream of JSON objects.

The JSON Parser consumes a stream of JSON objects. Each object must be well formed and on a single line in the input. Use line breaks to delimit the objects. The parser uses the field names as keys in a map, which become column names in the table. You can find the code for this example in /opt/vertica/packages/flextable/examples. This directory also contains an example data file.

This example uses the setRowFromMap() method to write data.

Loading and using the example

Load the library and define the JSON parser, using the third-party library (gson-2.2.4.jar) as follows. See the comments in JsonParser.java for a download URL:

=> CREATE LIBRARY json
-> AS '/opt/vertica/packages/flextable/examples/java/output/json.jar'
-> DEPENDS '/opt/vertica/bin/gson-2.2.4.jar' language 'java';
CREATE LIBRARY

=> CREATE PARSER JsonParser AS LANGUAGE 'java'
-> NAME 'com.vertica.flex.JsonParserFactory' LIBRARY json;
CREATE PARSER FUNCTION

You can now define a table and then use the JSON parser to load data into it, as follows:

=> CREATE TABLE mountains(name varchar(64), type varchar(32), height integer);
CREATE TABLE

=> COPY mountains FROM '/opt/vertica/packages/flextable/examples/mountains.json'
-> WITH PARSER JsonParser();
-[ RECORD 1 ]--
Rows Loaded | 2

=> SELECT * from mountains;
-[ RECORD 1 ]--------
name   | Everest
type   | mountain
height | 29029
-[ RECORD 2 ]--------
name   | Mt St Helens
type   | volcano
height |

The data file contains a value (hike_safety) that was not loaded because the table definition did not include that column. The data file follows:

{ "name": "Everest", "type":"mountain", "height": 29029, "hike_safety": 34.1  }
{ "name": "Mt St Helens", "type": "volcano", "hike_safety": 15.4 }

Implementation

The following code shows the process() method from JsonParser.java. The parser attempts to read the input into a Map. If the read is successful, the JSON Parser calls setRowFromMap():

    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer input,
            InputState inputState) throws UdfException, DestroyInvocation {
        clearReject();
        StreamWriter output = getStreamWriter();

        while (input.offset < input.buf.length) {
            ByteBuffer lineBytes = consumeNextLine(input, inputState);

            if (lineBytes == null) {
                return StreamState.INPUT_NEEDED;
            }

            String lineString = StringUtils.newString(lineBytes);

            try {
                Map<String,Object> map = gson.fromJson(lineString, parseType);

                if (map == null) {
                    continue;
                }

                output.setRowFromMap(map);
                // No overrides needed, so just call next() here.
         output.next();
            } catch (Exception ex) {
                setReject(lineString, ex);
                return StreamState.REJECT;
            }
        }
    }

The factory, JsonParserFactory.java, instantiates and returns a parser in the prepare() method. No additional setup is required.

3.8 - C++ example: delimited parser and chunker

The ExampleDelimitedUDChunker class divides an input at delimiter characters.

The ExampleDelimitedUDChunker class divides an input at delimiter characters. You can use this chunker with any parser that understands delimited input. ExampleDelimitedParser is a ContinuousUDParser subclass that uses this chunker.

Loading and using the example

Load and use the example as follows.

=> CREATE LIBRARY ExampleDelimitedParserLib AS '/home/dbadmin/EDP.so';

=> CREATE PARSER ExampleDelimitedParser AS
    LANGUAGE 'C++' NAME 'DelimitedParserFrameworkExampleFactory'
    LIBRARY ExampleDelimitedParserLib;

=> COPY t FROM stdin WITH PARSER ExampleDelimitedParser();
0
1
2
3
4
5
6
7
8
9
\.

Chunker implementation

This chunker supports apportioned load. The alignPortion() method finds the beginning of the first complete record in the current portion and aligns the input buffer with it. The record terminator is passed as an argument and set in the constructor.

StreamState ExampleDelimitedUDChunker::alignPortion(
            ServerInterface &srvInterface,
            DataBuffer &input, InputState state)
{
    /* find the first record terminator.  Its record belongs to the previous portion */
    void *buf = reinterpret_cast<void *>(input.buf + input.offset);
    void *term = memchr(buf, recordTerminator, input.size - input.offset);

    if (term) {
        /* record boundary found.  Align to the start of the next record */
        const size_t chunkSize = reinterpret_cast<size_t>(term) - reinterpret_cast<size_t>(buf);
        input.offset += chunkSize
            + sizeof(char) /* length of record terminator */;

        /* input.offset points at the start of the first complete record in the portion */
        return DONE;
    } else if (state == END_OF_FILE || state == END_OF_PORTION) {
        return REJECT;
    } else {
        VIAssert(state == START_OF_PORTION || state == OK);
        return INPUT_NEEDED;
    }
}

The process() method has to account for chunks that span portion boundaries. If the previous call was at the end of a portion, the method set a flag. The code begins by checking for and handling that condition. The logic is similar to that of alignPortion(), so the example calls it to do part of the division.

StreamState ExampleDelimitedUDChunker::process(
                ServerInterface &srvInterface,
                DataBuffer &input,
                InputState input_state)
{
    const size_t termLen = 1;
    const char *terminator = &recordTerminator;

    if (pastPortion) {
        /*
         * Previous state was END_OF_PORTION, and the last chunk we will produce
         * extends beyond the portion we started with, into the next portion.
         * To be consistent with alignPortion(), that means finding the first
         * record boundary, and setting the chunk to be at that boundary.
         * Fortunately, this logic is identical to aligning the portion (with
         * some slight accounting for END_OF_FILE)!
         */
        const StreamState findLastTerminator = alignPortion(srvInterface, input);

        switch (findLastTerminator) {
            case DONE:
                return DONE;
            case INPUT_NEEDED:
                if (input_state == END_OF_FILE) {
                    /* there is no more input where we might find a record terminator */
                    input.offset = input.size;
                    return DONE;
                }
                return INPUT_NEEDED;
            default:
                VIAssert("Invalid return state from alignPortion()");
        }
        return findLastTerminator;
    }

Now the method looks for the delimiter. If the input began at the end of a portion, it sets the flag.

    size_t ret = input.offset, term_index = 0;
    for (size_t index = input.offset; index < input.size; ++index) {
        const char c = input.buf[index];
        if (c == terminator[term_index]) {
            ++term_index;
            if (term_index == termLen) {
                ret = index + 1;
                term_index = 0;
            }
            continue;
        } else if (term_index > 0) {
            index -= term_index;
        }

        term_index = 0;
    }

    if (input_state == END_OF_PORTION) {
        /*
         * Regardless of whether or not a record was found, the next chunk will extend
         * into the next portion.
         */
        pastPortion = true;
    }

Finally, process() moves the input offset and returns.

    // if we were able to find some rows, move the offset to point at the start of the next (potential) row, or end of block
    if (ret > input.offset) {
        input.offset = ret;
        return CHUNK_ALIGNED;
    }

    if (input_state == END_OF_FILE) {
        input.offset = input.size;
        return DONE;
    }

    return INPUT_NEEDED;
}

Factory implementation

The file ExampleDelimitedParser.cpp defines a factory that uses this UDChunker. The chunker supports apportioned load, so the factory implements isChunkerApportionable():

    virtual bool isChunkerApportionable(ServerInterface &srvInterface) {
        ParamReader params = srvInterface.getParamReader();
        if (params.containsParameter("disable_chunker") && params.getBoolRef("d\
isable_chunker")) {
            return false;
        } else {
            return true;
        }
    }

The prepareChunker() method creates the chunker:

    virtual UDChunker* prepareChunker(ServerInterface &srvInterface,
                                      PerColumnParamReader &perColumnParamReade\
r,
                                      PlanContext &planCtxt,
                                      const SizedColumnTypes &returnType)
    {
        ParamReader params = srvInterface.getParamReader();
        if (params.containsParameter("disable_chunker") && params.getBoolRef("d\
isable_chunker")) {
            return NULL;
        }

        std::string recordTerminator("\n");

        ParamReader args(srvInterface.getParamReader());
        if (args.containsParameter("record_terminator")) {
            recordTerminator = args.getStringRef("record_terminator").str();
        }

        return vt_createFuncObject<ExampleDelimitedUDChunker>(srvInterface.allo\
cator,
                recordTerminator[0]);
    }

3.9 - Python example: complex types JSON parser

The following example details a UDParser that takes a JSON object and parses it into complex types.

The following example details a UDParser that takes a JSON object and parses it into complex types. For this example, the parser assumes the input data are arrays of rows with two integer fields. The input records should be separated by newline characters. If any row fields aren't specified by the JSON input, the function parses those fields as NULL.

The source code for this UDParser also contains a factory method for parsing rows that have an integer and an array of integer fields. The implementation of the parser is independent of the return type in the factory, so you can create factories with different return types that all point to the ComplexJsonParser() class in the prepare() method. The complete source code is in /opt/vertica/sdk/examples/python/UDParsers.py.

Loading and using the example

Load the library and create the parser as follows:


=> CREATE OR REPLACE LIBRARY UDParsers AS '/home/dbadmin/examples/python/UDParsers.py' LANGUAGE 'Python';

=> CREATE PARSER ComplexJsonParser AS LANGUAGE 'Python' NAME 'ArrayJsonParserFactory' LIBRARY UDParsers;

You can now define a table and then use the JSON parser to load data into it, for example:


=> CREATE TABLE orders (a bool, arr array[row(a int, b int)]);
CREATE TABLE

=> COPY orders (arr) FROM STDIN WITH PARSER ComplexJsonParser();
[]
[{"a":1, "b":10}]
[{"a":1, "b":10}, {"a":null, "b":10}]
[{"a":1, "b":10},{"a":10, "b":20}]
[{"a":1, "b":10}, {"a":null, "b":null}]
[{"a":1, "b":2}, {"a":3, "b":4}, {"a":5, "b":6}, {"a":7, "b":8}, {"a":9, "b":10}, {"a":11, "b":12}, {"a":13, "b":14}]
\.

=> SELECT * FROM orders;
a |                                  arr
--+--------------------------------------------------------------------------
  | []
  | [{"a":1,"b":10}]
  | [{"a":1,"b":10},{"a":null,"b":10}]
  | [{"a":1,"b":10},{"a":10,"b":20}]
  | [{"a":1,"b":10},{"a":null,"b":null}]
  | [{"a":1,"b":2},{"a":3,"b":4},{"a":5,"b":6},{"a":7,"b":8},{"a":9,"b":10},{"a":11,"b":12},{"a":13,"b":14}]
(6 rows)

Setup

All Python UDxs must import the Vertica SDK library. ComplexJsonParser() also requires the json library.

import vertica_sdk
import json

Factory implementation

The prepare() method instantiates and returns a parser:


def prepare(self, srvInterface, perColumnParamReader, planCtxt, returnType):
    return ComplexJsonParser()

getParserReturnType() declares that the return type must be an array of rows that each have two integer fields:


def getParserReturnType(self, rvInterface, perColumnParamReader, planCtxt, argTypes, returnType):
    fieldTypes = vertica_sdk.SizedColumnTypes.makeEmpty()
    fieldTypes.addInt('a')
    fieldTypes.addInt('b')
    returnType.addArrayType(vertica_sdk.SizedColumnTypes.makeRowType(fieldTypes, 'elements'), 64, 'arr')

Parser implementation

The process() method reads in data with an InputBuffer and then splits that input data on the newline character. The method then passes the processed data to the writeRows() method. writeRows() turns each data row into a JSON object, checks the type of that JSON object, and then writes the appropriate value or object to the output.


class ComplexJsonParser(vertica_sdk.UDParser):

    leftover = ''

    def process(self, srvInterface, input_buffer, input_state, writer):
        input_buffer.setEncoding('utf-8')

        self.count = 0
        rec = self.leftover + input_buffer.read()
        row_lst = rec.split('\n')
        self.leftover = row_lst[-1]
        self.writeRows(row_lst[:-1], writer)
        if input_state == InputState.END_OF_FILE:
            self.writeRows([self.leftover], writer)
            return StreamState.DONE
        else:
            return StreamState.INPUT_NEEDED

    def writeRows(self, str_lst, writer):
        for s in str_lst:
            stripped = s.strip()
            if len(stripped) == 0:
                return
            elif len(stripped) > 1 and stripped[0:2] == "//":
                continue
            jsonValue = json.loads(stripped)
            if type(jsonValue) is list:
                writer.setArray(0, jsonValue)
            elif jsonValue is None:
                writer.setNull(0)
            else:
                writer.setRow(0, jsonValue)
            writer.next()

4 - Load parallelism

Vertica can divide the work of loading data, taking advantage of parallelism to speed up the operation.

Vertica can divide the work of loading data, taking advantage of parallelism to speed up the operation. Vertica supports several types of parallelism:

  • Distributed load: Vertica distributes files in a multi-file load to several nodes to load in parallel, instead of loading all of them on a single node. Vertica manages distributed load; you do not need to do anything special in your UDL.

  • Cooperative parse: A source being loaded on a single node uses multi-threading to parallelize the parse. Cooperative parse divides a load at execution time, based on how threads are scheduled. You must enable cooperative parse in your parser. See Cooperative parse.

  • Apportioned load: Vertica divides a single large file or other single source into segments, which it assigns to several nodes to load in parallel. Apportioned load divides the load at planning time, based on available nodes and cores on each node. You must enable apportioned load in your source and parser. See Apportioned load.

You can support both cooperative parse and apportioned load in the same UDL. Vertica decides which to use for each load operation and might use both. See Combining cooperative parse and apportioned load.

4.1 - Cooperative parse

By default, Vertica parses a data source in a single thread on one database node.

By default, Vertica parses a data source in a single thread on one database node. You can optionally use cooperative parse to parse a source using multiple threads on a node. More specifically, data from a source passes through a chunker that groups blocks from the source stream into logical units. These chunks can be parsed in parallel. The chunker divides the input into pieces that can be individually parsed, and the parser then parses them concurrently. Cooperative parse is available only for unfenced UDxs. (See Fenced and unfenced modes.)

To use cooperative parse, a chunker must be able to locate end-of-record markers in the input. Locating these markers might not be possible in all input formats.

Chunkers are created by parser factories. At load time, Vertica first calls the UDChunker to divide the input into chunks and then calls the UDParser to parse each chunk.

You can use cooperative parse and apportioned load independently or together. See Combining cooperative parse and apportioned load.

How Vertica divides a load

When Vertica receives data from a source, it calls the chunker's process() method repeatedly. A chunker is, essentially, a lightweight parser; instead of parsing, the process() method divides the input into chunks.

After the chunker has finished dividing the input into chunks, Vertica sends those chunks to as many parsers as are available, calling the process() method on the parser.

Implementing cooperative parse

To implement cooperative parse, perform the following actions:

  • Subclass UDChunker and implement process().

  • In your ParserFactory, implement prepareChunker() to return a UDChunker.

See C++ example: delimited parser and chunker for a UDChunker that also supports apportioned load.

4.2 - Apportioned load

A parser can use more than one database node to load a single input source in parallel.

A parser can use more than one database node to load a single input source in parallel. This approach is referred to as apportioned load. Among the parsers built into Vertica, the default (delimited) parser supports apportioned load.

Apportioned load, like cooperative parse, requires an input that can be divided at record boundaries. The difference is that cooperative parse does a sequential scan to find record boundaries, while apportioned load first jumps (seeks) to a given position and then scans. Some formats, like generic XML, do not support seeking.

To use apportioned load, you must ensure that the source is reachable by all participating database nodes. You typically use apportioned load with distributed file systems.

It is possible for a parser to not support apportioned load directly but to have a chunker that supports apportioning.

You can use apportioned load and cooperative parse independently or together. See Combining cooperative parse and apportioned load.

How Vertica apportions a load

If both the parser and its source support apportioning, then you can specify that a single input is to be distributed to multiple database nodes for loading. The SourceFactory breaks the input into portions and assigns them to execution nodes. Each Portion consists of an offset into the input and a size. Vertica distributes the portions and their parameters to the execution nodes. A source factory running on each node produces a UDSource for the given portion.

The UDParser first determines where to start parsing:

  • If the portion is the first one in the input, the parser advances to the offset and begins parsing.

  • If the portion is not the first, the parser advances to the offset and then scans until it finds the end of a record. Because records can break across portions, parsing begins after the first record-end encountered.

The parser must complete a record, which might require it to read past the end of the portion. The parser is responsible for parsing all records that begin in the assigned portion, regardless of where they end. Most of this work occurs within the process() method of the parser.

Sometimes, a portion contains nothing to be parsed by its assigned node. For example, suppose you have a record that begins in portion 1, runs through all of portion 2, and ends in portion 3. The parser assigned to portion 1 parses the record, and the parser assigned to portion 3 starts after that record. The parser assigned to portion 2, however, has no record starting within its portion.

If the load also uses Cooperative parse, then after apportioning the load and before parsing, Vertica divides portions into chunks for parallel loading.

Implementing apportioned load

To implement apportioned load, perform the following actions in the source, the parser, and their factories.

In your SourceFactory subclass:

  • Implement isSourceApportionable() and return true.

  • Implement plan() to determine portion size, designate portions, and assign portions to execution nodes. To assign portions to particular executors, pass the information using the parameter writer on the plan context (PlanContext::getWriter()).

  • Implement prepareUDSources(). Vertica calls this method on each execution node with the plan context created by the factory. This method returns the UDSource instances to be used for this node's assigned portions.

  • If sources can take advantage of parallelism, you can implement getDesiredThreads() to request a number of threads for each source. See SourceFactory class for more information about this method.

In your UDSource subclass, implement process() as you would for any other source, using the assigned portion. You can retrieve this portion with getPortion().

In your ParserFactory subclass:

  • Implement isParserApportionable() and return true.

  • If your parser uses a UDChunker that supports apportioned load, implement isChunkerApportionable().

In your UDParser subclass:

  • Write your UDParser subclass to operate on portions rather than whole sources. You can do so by handling the stream states PORTION_START and PORTION_END, or by using the ContinuousUDParser API. Your parser must scan for the beginning of the portion, find the first record boundary after that position, and parse to the end of the last record beginning in that portion. Be aware that this behavior might require that the parser read beyond the end of the portion.

  • Handle the special case of a portion containing no record start by returning without writing any output.

In your UDChunker subclass, implement alignPortion(). See Aligning Portions.

Example

The SDK provides a C++ example of apportioned load in the ApportionLoadFunctions directory:

  • FilePortionSource is a subclass of UDSource.

  • DelimFilePortionParser is a subclass of ContinuousUDParser.

Use these classes together. You could also use FilePortionSource with the built-in delimited parser.

The following example shows how you can load the libraries and create the functions in the database:


=> CREATE LIBRARY FilePortionSourceLib as '/home/dbadmin/FP.so';

=> CREATE LIBRARY DelimFilePortionParserLib as '/home/dbadmin/Delim.so';

=> CREATE SOURCE FilePortionSource AS
LANGUAGE 'C++' NAME 'FilePortionSourceFactory' LIBRARY FilePortionSourceLib;

=> CREATE PARSER DelimFilePortionParser AS
LANGUAGE 'C++' NAME 'DelimFilePortionParserFactory' LIBRARY DelimFilePortionParserLib;

The following example shows how you can use the source and parser to load data:


=> COPY t WITH SOURCE FilePortionSource(file='g1/*.dat') PARSER DelimFilePortionParser(delimiter = '|',
    record_terminator = '~');

4.3 - Combining cooperative parse and apportioned load

You can enable both Cooperative Parse and Apportioned Load in the same parser, allowing Vertica to decide how to load data.

You can enable both Cooperative parse and Apportioned load in the same parser, allowing Vertica to decide how to load data.

Deciding how to divide a load

Vertica uses apportioned load, where possible, at query-planning time. It decides whether to also use cooperative parse at execution time.

Apportioned load requires SourceFactory support. Given a suitable UDSource, at planning time Vertica calls the isParserApportionable() method on the ParserFactory. If this method returns true, Vertica apportions the load.

If isParserApportionable() returns false but isChunkerApportionable() returns true, then a chunker is available for cooperative parse and that chunker supports apportioned load. Vertica apportions the load.

If neither of these methods returns true, then Vertica does not apportion the load.

At execution time, Vertica first checks whether the load is running in unfenced mode and proceeds only if it is. Cooperative parse is not supported in fenced mode.

If the load is not apportioned, and more than one thread is available, Vertica uses cooperative parse.

If the load is apportioned, and exactly one thread is available, Vertica uses cooperative parse if and only if the parser is not apportionable. In this case, the chunker is apportionable but the parser is not.

If the load is apportioned, and more than one thread is available, and the chunker is apportionable, Vertica uses cooperative parse.

If Vertica uses cooperative parse but prepareChunker() does not return a UDChunker instance, Vertica reports an error.

Executing apportioned, cooperative loads

If a load uses both apportioned load and cooperative parse, Vertica uses the SourceFactory to break the input into portions. It then assigns the portions to execution nodes. See How Vertica Apportions a Load.

On the execution node, Vertica calls the chunker's alignPortion() method to align the input with portion boundaries. (This step is skipped for the first portion, which by definition is already aligned at the beginning.) This step is necessary because a parser using apportioned load sometimes has to read beyond the end of the portion, so a chunker needs to find the end point.

After aligning the portion, Vertica calls the chunker's process() method repeatedly. See How Vertica Divides a Load.

The chunks found by the chunker are then sent to the parser's process() method for processing in the usual way.

5 - Continuous load

The ContinuousUDSource, ContinuousUDFilter, and ContinuousUDParser classes allow you to write and process data as needed instead of having to iterate through the data.

The ContinuousUDSource, ContinuousUDFilter, and ContinuousUDParser classes allow you to write and process data as needed instead of having to iterate through the data. The Python API does not support continuous load.

Each class includes the following functions:

  • initialize() - Invoked before run(). You can optionally override this function to perform setup and initialization.

  • run() - Processes the data.

  • deinitialize() - Invoked after run() has returned. You can optionally override this function to perform tear-down and destruction.

Do not override the setup(), process(), and destroy() functions that are inherited from parent classes.

You can use the yield() function to yield control back to the server during idle or busy loops so the server can check for status changes or query cancellations.

These three classes use associated ContinuousReader and ContinuousWriter classes to read input data and write output data.

ContinuousUDSource API (C++)

The ContinuousUDSource class extends UDSource and adds the following methods for extension by subclasses:

virtual void initialize(ServerInterface &srvInterface);

virtual void run();

virtual void deinitialize(ServerInterface &srvInterface);

ContinuousUDFilter API (C++)

The ContinuousUDFilter class extends UDFilter and adds the following methods for extension by subclasses:

virtual void initialize(ServerInterface &srvInterface);

virtual void run();

virtual void deinitialize(ServerInterface &srvInterface);

ContinuousUDParser API

The ContinuousUDParser class extends UDParser and adds the following methods for extension by subclasses:

virtual void initialize(ServerInterface &srvInterface);

virtual void run();

virtual void deinitialize(ServerInterface &srvInterface);

The ContinuousUDParser class extends UDParser and adds the following methods for extension by subclasses:


public void initialize(ServerInterface srvInterface, SizedColumnTypes returnType);

public abstract void run() throws UdfException;

public void deinitialize(ServerInterface srvInterface, SizedColumnTypes returnType);

See the API documentation for additional utility methods.

6 - Buffer classes

Buffer classes are used as handles to the raw data stream for all UDL functions.

Buffer classes are used as handles to the raw data stream for all UDL functions. The C++ and Java APIs use a single DataBuffer class for both input and output. The Python API has two classes, InputBuffer and OutputBuffer.

DataBuffer API (C++, java)

The DataBuffer class has a pointer to a buffer and size, and an offset indicating how much of the stream has been consumed.

/**
* A contiguous in-memory buffer of char *
*/
    struct DataBuffer {
    /// Pointer to the start of the buffer
    char * buf;

    /// Size of the buffer in bytes
    size_t size;

    /// Number of bytes that have been processed by the UDL
    size_t offset;
};

The DataBuffer class has an offset indicating how much of the stream has been consumed. Because Java is a language whose strings require attention to character encodings, the UDx must decode or encode buffers. A parser can interact with the stream by accessing the buffer directly.

/**
* DataBuffer is a a contiguous in-memory buffer of data.
*/
public class DataBuffer {

/**
* The buffer of data.
*/
public byte[] buf;

/**
* An offset into the buffer that is typically used to track progress
* through the DataBuffer. For example, a UDParser advances the
* offset as it consumes data from the DataBuffer.
*/
public int offset;}

InputBuffer and OutputBuffer APIs (python)

The Python InputBuffer and OutputBuffer classes replace the DataBuffer class in the C++ and Java APIs.

InputBuffer class

The InputBuffer class decodes and translates raw data streams depending on the specified encoding. Python natively supports a wide range of languages and codecs. The InputBuffer is an argument to the process() method for both UDFilters and UDParsers. A user interacts with the UDL's data stream by calling methods of the InputBuffer

If you do not specify a value for setEncoding(), Vertica assumes a value of NONE.

class InputBuffer:
    def getSize(self):
        ...
    def getOffset(self):
    ...

    def setEncoding(self, encoding):
        """
        Set the encoding of the data contained in the underlying buffer
        """
        pass

    def peek(self, length = None):
        """
        Copy data from the input buffer into Python.
        If no encoding has been specified, returns a Bytes object containing raw data.
        Otherwise, returns data decoded into an object corresponding to the specified encoding
        (for example, 'utf-8' would return a string).
        If length is None, returns all available data.
        If length is not None then the length of the returned object is at most what is requested.
        This method does not advance the buffer offset.
        """
        pass

    def read(self, length = None):
        """
        See peek().
        This method does the same thing as peek(), but it also advances the
        buffer offset by the number of bytes consumed.
        """
        pass

        # Advances the DataBuffer offset by a number of bytes equal to the result
        # of calling "read" with the same arguments.
        def advance(self, length = None):
        """
        Advance the buffer offset by the number of bytes indicated by
        the length and encoding arguments.  See peek().
    Returns the new offset.
        """
        pass

OutputBuffer class

The OutputBuffer class encodes and outputs data from Python to Vertica. The OutputBuffer is an argument to the process() method for both UDFilters and UDParsers. A user interacts with the UDL's data stream by calling methods of the OutputBuffer to manipulate and encode data.

The write() method transfers all data from the Python client to Vertica. The output buffer can accept any size object. If a user writes an object to the OutputBuffer larger than Vertica can immediately process, Vertica stores the overflow. During the next call to process(),Vertica checks for leftover data. If there is any, Vertica copies it to the DataBuffer before determining whether it needs to call process() from the Python UDL.

If you do not specify a value for setEncoding(), Vertica assumes a value of NONE.

class OutputBuffer:
def setEncoding(self, encoding):
"""
Specify the encoding of the data which will be written to the underlying buffer
"""
pass
def write(self, data):
"""
Transfer bytes from the data object into Vertica.
If an encoding was specified via setEncoding(), the data object will be converted to bytes using the specified encoding.
Otherwise, the data argument is expected to be a Bytes object and is copied to the underlying buffer.
"""
pass