This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

User-defined source

A user-defined source allows you to process a source of data using a method that is not built into Vertica.

A user-defined source allows you to process a source of data using a method that is not built into Vertica. For example, you can write a user-defined source to access the data from an HTTP source using cURL. While a given COPY statement can use specify only one user-defined source statement, the source function itself can pull data from multiple sources.

The UDSource class acquires data from an external source. It reads data from an input stream and produces an output stream to be filtered and parsed. If you implement a UDSource, you must also implement a corresponding SourceFactory.

1 - UDSource class

You can subclass the UDSource class when you need to load data from a source type that COPY does not already support.

You can subclass the UDSource class when you need to load data from a source type that COPY does not already support.

Each instance of your UDSource subclass reads from a single data source. Examples of a single data source are a single file or the results of a single function call to a RESTful web application.

UDSource methods

Your UDSource subclass must override process() or processWithMetadata():

  • process() reads the raw input stream as one large file. If there are any errors or failures, the entire load fails.

  • processWithMetadata() is useful when the data source has metadata about record boundaries available in some structured format that's separate from the data payload. With this interface, the source emits a record length for each record in addition to the data.

    By implementing processWithMetadata() instead of process() in each phase, you can retain this record length metadata throughout the load stack, which enables a more efficient parse that can recover from errors on a per-message basis, rather than a per-file or per-source basis. KafkaSource and the Kafka parsers (KafkaAvroParser, KafkaJSONParser, and KafkaParser) use this mechanism to support per-Kafka-message rejections when individual Kafka messages are cannot be parsed.

Additionally, you can override the other UDSource class methods.

Source execution

The following sections detail the execution sequence each time a user-defined source is called. The following example overrides the process() method.

Setting Up
COPY calls setup() before the first time it calls process(). Use setup() to perform any necessary setup steps to access the data source. This method establishes network connections, opens files, and similar tasks that need to be performed before the UDSource can read data from the data source. Your object might be destroyed and re-created during use, so make sure that your object is restartable.

Processing a Source
COPY calls process() repeatedly during query execution to read data and write it to the DataBuffer passed as a parameter. This buffer is then passed to the first filter.

If the source runs out of input, or fills the output buffer, it must return the value StreamState.OUTPUT_NEEDED. When Vertica gets this return value, it will call the method again. This second call occurs after the output buffer has been processed by the next stage in the data-load process. Returning StreamState.DONE indicates that all of the data from the source has been read.

The user can cancel the load operation, which aborts reading.

Tearing Down
COPY calls destroy() after the last time that process() is called. This method frees any resources reserved by the setup() or process() methods, such as file handles or network connections that the setup() method allocated.

Accessors

A source can define two accessors, getSize() and getUri().

COPY might call getSize() to estimate the number of bytes of data to be read before calling process(). This value is an estimate only and is used to indicate the file size in the LOAD_STREAMS table. Because Vertica can call this method before calling setup(), getSize() must not rely on any resources allocated by setup().

This method should not leave any resources open. For example, do not save any file handles opened by getSize() for use by the process() method. Doing so can exhaust the available resources, because Vertica calls getSize() on all instances of your UDSource subclass before any data is loaded. If many data sources are being opened, these open file handles could use up the system's supply of file handles. Thus, none would remain available to perform the actual data load.

Vertica calls getUri() during execution to update status information about which resources are currently being loaded. It returns the URI of the data source being read by this UDSource.

API

The UDSource API provides the following methods for extension by subclasses:

virtual void setup(ServerInterface &srvInterface);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &output, LengthBuffer &output_lengths)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface);

virtual vint getSize();

virtual std::string getUri();

The UDSource API provides the following methods for extension by subclasses:

public void setup(ServerInterface srvInterface) throws UdfException;

public abstract StreamState process(ServerInterface srvInterface, DataBuffer output) throws UdfException;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface) throws UdfException;

public Integer getSize();

public String getUri();

2 - SourceFactory class

If you write a source, you must also write a source factory.

If you write a source, you must also write a source factory. Your subclass of the SourceFactory class is responsible for:

  • Performing the initial validation of the parameters passed to your UDSource.

  • Setting up any data structures your UDSource instances need to perform their work. This information can include recording which nodes will read which data source.

  • Creating one instance of your UDSource subclass for each data source (or portion thereof) that your function reads on each host.

The simplest source factory creates one UDSource instance per data source per executor node. You can also use multiple concurrent UDSource instances on each node. This behavior is called concurrent load. To support both options, SourceFactory has two versions of the method that creates the sources. You must implement exactly one of them.

Source factories are singletons. Your subclass must be stateless, with no fields containing data. The subclass also must not modify any global variables.

SourceFactory methods

The SourceFactory class defines several methods. Your class must override prepareUDSources(); it may override the other methods.

Setting up

Vertica calls plan() once on the initiator node to perform the following tasks:

  • Check the parameters the user supplied to the function call in the COPY statement and provide error messages if there are any issues. You can read the parameters by getting a ParamReader object from the instance of ServerInterface passed into the plan() method.

  • Decide which hosts in the cluster will read the data source. How you divide up the work depends on the source your function is reading. Some sources can be split across many hosts, such as a source that reads data from many URLs. Others, such as an individual local file on a host's file system, can be read only by a single specified host.

    You store the list of hosts to read the data source by calling the setTargetNodes() method on the NodeSpecifyingPlanContext object. This object is passed into your plan() method.

  • Store any information that the individual hosts need to process the data sources in the NodeSpecifyingPlanContext instance passed to the plan() method. For example, you could store assignments that tell each host which data sources to process. The plan() method runs only on the initiator node, and the prepareUDSources() method runs on each host reading from a data source. Therefore, this object is the only means of communication between them.

    You store data in the NodeSpecifyingPlanContext by getting a ParamWriter object from the getWriter() method. You then write parameters by calling methods on the ParamWriter such as setString().

Creating sources

Vertica calls prepareUDSources() on all hosts that the plan() method selected to load data. This call instantiates and returns a list of UDSource subclass instances. If you are not using concurrent load, return one UDSource for each of the sources that the host is assigned to process. If you are using concurrent load, use the version of the method that takes an ExecutorPlanContext as a parameter, and return as many sources as you can use. Your factory must implement exactly one of these methods.

For concurrent load, you can find out how many threads are available on the node to run UDSource instances by calling getLoadConcurrency() on the ExecutorPlanContext that is passed in.

Defining parameters

Implement getParameterTypes() to define the names and types of parameters that your source uses. Vertica uses this information to warn callers about unknown or missing parameters. Vertica ignores unknown parameters and uses default values for missing parameters. While you should define the types and parameters for your function, you are not required to override this method.

Requesting threads for concurrent load

When a source factory creates sources on an executor node, by default, it creates one thread per source. If your sources can use multiple threads, implement getDesiredThreads(). Vertica calls this method before it calls prepareUDSources(), so you can also use it to decide how many sources to create. Return the number of threads your factory can use for sources. The maximum number of available threads is passed in, so you can take that into account. The value your method returns is a hint, not a guarantee; each executor node determines the number of threads to allocate. The FilePortionSourceFactory example implements this method; see C++ example: concurrent load.

You can allow your source to have control over parallelism, meaning that it can divide a single input into multiple load streams, by implementing isSourceApportionable(). Returning true from this method does not guarantee that the source will apportion the load. However, returning false indicates that it will not try to do so. See Apportioned load for more information.

Often, a SourceFactory that implements getDesiredThreads() also uses apportioned load. However, using apportioned load is not a requirement. A source reading from Kafka streams, for example, could use multiple threads without ssapportioning.

API

The SourceFactory API provides the following methods for extension by subclasses:

virtual void plan(ServerInterface &srvInterface, NodeSpecifyingPlanContext &planCtxt);

// must implement exactly one of prepareUDSources() or prepareUDSourcesExecutor()
virtual std::vector< UDSource * > prepareUDSources(ServerInterface &srvInterface,
            NodeSpecifyingPlanContext &planCtxt);

virtual std::vector< UDSource * > prepareUDSourcesExecutor(ServerInterface &srvInterface,
            ExecutorPlanContext &planCtxt);

virtual void getParameterType(ServerInterface &srvInterface,
            SizedColumnTypes &parameterTypes);

virtual bool isSourceApportionable();

ssize_t getDesiredThreads(ServerInterface &srvInterface,
            ExecutorPlanContext &planContext);

After creating your SourceFactory, you must register it with the RegisterFactory macro.

The SourceFactory API provides the following methods for extension by subclasses:

public void plan(ServerInterface srvInterface, NodeSpecifyingPlanContext planCtxt)
    throws UdfException;

// must implement one overload of prepareUDSources()
public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface,
                NodeSpecifyingPlanContext planCtxt)
    throws UdfException;

public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface,
                ExecutorPlanContext planCtxt)
    throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

public boolean isSourceApportionable();

public int getDesiredThreads(ServerInterface srvInterface,
                ExecutorPlanContext planCtxt)
    throws UdfException;

3 - C++ example: CurlSource

The CurlSource example allows you to use cURL to open and read in a file over HTTP.

The CurlSource example allows you to use cURL to open and read in a file over HTTP. The example provided is part of: /opt/vertica/sdk/examples/SourceFunctions/cURL.cpp.

Source implementation

This example uses the helper library available in /opt/vertica/sdk/examples/HelperLibraries/.

CurlSource loads the data in chunks. If the parser encounters an EndOfFile marker, then the process() method returns DONE. Otherwise, the method returns OUTPUT_NEEDED and processes another chunk of data. The functions included in the helper library (such as url_fread() and url_fopen()) are based on examples that come with the libcurl library. For an example, see http://curl.haxx.se/libcurl/c/fopen.html.

The setup() function opens a file handle and the destroy() function closes it. Both use functions from the helper library.

class CurlSource : public UDSource {private:
    URL_FILE *handle;
    std::string url;
    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output) {
        output.offset = url_fread(output.buf, 1, output.size, handle);
        return url_feof(handle) ? DONE : OUTPUT_NEEDED;
    }
public:
    CurlSource(std::string url) : url(url) {}
    void setup(ServerInterface &srvInterface) {
        handle = url_fopen(url.c_str(),"r");
    }
    void destroy(ServerInterface &srvInterface) {
        url_fclose(handle);
    }
};

Factory implementation

CurlSourceFactory produces CurlSource instances.

class CurlSourceFactory : public SourceFactory {public:
    virtual void plan(ServerInterface &srvInterface,
            NodeSpecifyingPlanContext &planCtxt) {
        std::vector<std::string> args = srvInterface.getParamReader().getParamNames();
       /* Check parameters */
        if (args.size() != 1 || find(args.begin(), args.end(), "url") == args.end()) {
            vt_report_error(0, "You must provide a single URL.");
        }
        /* Populate planData */
        planCtxt.getWriter().getStringRef("url").copy(
                                    srvInterface.getParamReader().getStringRef("url"));

        /* Assign Nodes */
        std::vector<std::string> executionNodes = planCtxt.getClusterNodes();
        while (executionNodes.size() > 1) executionNodes.pop_back();
        // Only run on the first node in the list.
        planCtxt.setTargetNodes(executionNodes);
    }
    virtual std::vector<UDSource*> prepareUDSources(ServerInterface &srvInterface,
            NodeSpecifyingPlanContext &planCtxt) {
        std::vector<UDSource*> retVal;
        retVal.push_back(vt_createFuncObj(srvInterface.allocator, CurlSource,
                planCtxt.getReader().getStringRef("url").str()));
        return retVal;
    }
    virtual void getParameterType(ServerInterface &srvInterface,
                                  SizedColumnTypes &parameterTypes) {
        parameterTypes.addVarchar(65000, "url");
    }
};
RegisterFactory(CurlSourceFactory);

4 - C++ example: concurrent load

The FilePortionSource example demonstrates the use of concurrent load.

The FilePortionSource example demonstrates the use of concurrent load. This example is a refinement of the FileSource example. Each input file is divided into portions and distributed to FilePortionSource instances. The source accepts a list of offsets at which to break the input into portions; if offsets are not provided, the source divides the input dynamically.

Concurrent load is handled in the factory, so this discussion focuses on FilePortionSourceFactory. The full code for the example is located in /opt/vertica/sdk/examples/ApportionLoadFunctions. The distribution also includes a Java version of this example.

Loading and using the example

Load and use the FilePortionSource example as follows.

=> CREATE LIBRARY FilePortionLib AS '/home/dbadmin/FP.so';

=> CREATE SOURCE FilePortionSource AS LANGUAGE 'C++'
-> NAME 'FilePortionSourceFactory' LIBRARY FilePortionLib;

=> COPY t WITH SOURCE FilePortionSource(file='g1/*.dat', nodes='initiator,e0,e1', offsets = '0,380000,820000');

=> COPY t WITH SOURCE FilePortionSource(file='g2/*.dat', nodes='e0,e1,e2', local_min_portion_size = 2097152);

Implementation

Concurrent load affects the source factory in two places, getDesiredThreads() and prepareUDSourcesExecutor().

getDesiredThreads()

The getDesiredThreads() member function determines the number of threads to request. Vertica calls this member function on each executor node before calling prepareUDSourcesExecutor().

The function begins by breaking an input file path, which might be a glob, into individual paths. This discussion omits those details. If apportioned load is not being used, then the function allocates one source per file.

virtual ssize_t getDesiredThreads(ServerInterface &srvInterface,
    ExecutorPlanContext &planCtxt) {
  const std::string filename = srvInterface.getParamReader().getStringRef("file").str();

  std::vector<std::string> paths;
  // expand the glob - at least one thread per source.
  ...

  // figure out how to assign files to sources
  const std::string nodeName = srvInterface.getCurrentNodeName();
  const size_t nodeId = planCtxt.getWriter().getIntRef(nodeName);
  const size_t numNodes = planCtxt.getTargetNodes().size();

  if (!planCtxt.canApportionSource()) {
    /* no apportioning, so the number of files is the final number of sources */
    std::vector<std::string> *expanded =
        vt_createFuncObject<std::vector<std::string> >(srvInterface.allocator, paths);
    /* save expanded paths so we don't have to compute expansion again */
    planCtxt.getWriter().setPointer("expanded", expanded);
    return expanded->size();
  }

  // ...

If the source can be apportioned, then getDesiredThreads() uses the offsets that were passed as arguments to the factory to divide the file into portions. It then allocates portions to available nodes. This function does not actually assign sources directly; this work is done to determine how many threads to request.

  else if (srvInterface.getParamReader().containsParameter("offsets")) {

    // if the offsets are specified, then we will have a fixed number of portions per file.
    // Round-robin assign offsets to nodes.
    // ...

    /* Construct the portions that this node will actually handle.
     * This isn't changing (since the offset assignments are fixed),
     * so we'll build the Portion objects now and make them available
     * to prepareUDSourcesExecutor() by putting them in the ExecutorContext.
     *
     * We don't know the size of the last portion, since it depends on the file
     * size.  Rather than figure it out here we will indicate it with -1 and
     * defer that to prepareUDSourcesExecutor().
     */
    std::vector<Portion> *portions =
        vt_createFuncObject<std::vector<Portion>>(srvInterface.allocator);

    for (std::vector<size_t>::const_iterator offset = offsets.begin();
            offset != offsets.end(); ++offset) {
        Portion p(*offset);
        p.is_first_portion = (offset == offsets.begin());
        p.size = (offset + 1 == offsets.end() ? -1 : (*(offset + 1) - *offset));

        if ((offset - offsets.begin()) % numNodes == nodeId) {
            portions->push_back(p);
            srvInterface.log("FilePortionSource: assigning portion %ld: [offset = %lld, size = %lld]",
                    offset - offsets.begin(), p.offset, p.size);
        }
      }

The function now has all the portions and thus the number of portions:

      planCtxt.getWriter().setPointer("portions", portions);

      /* total number of threads we want is the number of portions per file, which is fixed */
      return portions->size() * expanded->size();
    } // end of "offsets" parameter

If offsets were not provided, the function divides the file into portions dynamically, one portion per thread. This discussion omits the details of this computation. There is no point in requesting more threads than are available, so the function calls getMaxAllowedThreads() on the PlanContext (an argument to the function) to set an upper bound:

  if (portions->size() >= planCtxt.getMaxAllowedThreads()) {
    return paths.size();
  }

See the full example for the details of how this function divides the file into portions.

This function uses the vt_createFuncObject template to create objects. Vertica calls the destructors of returned objects created using this macro, but it does not call destructors for other objects like vectors. You must call these destructors yourself to avoid memory leaks. In this example, these calls are made in prepareUDSourcesExecutor().

prepareUDSourcesExecutor()

The prepareUDSourcesExecutor() member function, like getDesiredThreads(), has separate blocks of code depending on whether offsets are provided. In both cases, the function breaks input into portions and creates UDSource instances for them.

If the function is called with offsets, prepareUDSourcesExecutor() calls prepareCustomizedPortions(). This function follows.

/* prepare portions as determined via the "offsets" parameter */
void prepareCustomizedPortions(ServerInterface &srvInterface,
                               ExecutorPlanContext &planCtxt,
                               std::vector<UDSource *> &sources,
                               const std::vector<std::string> &expandedPaths,
                               std::vector<Portion> &portions) {
    for (std::vector<std::string>::const_iterator filename = expandedPaths.begin();
            filename != expandedPaths.end(); ++filename) {
        /*
         * the "portions" vector contains the portions which were generated in
         * "getDesiredThreads"
         */
        const size_t fileSize = getFileSize(*filename);
        for (std::vector<Portion>::const_iterator portion = portions.begin();
                portion != portions.end(); ++portion) {
            Portion fportion(*portion);
            if (fportion.size == -1) {
                /* as described above, this means from the offset to the end */
                fportion.size = fileSize - portion->offset;
                sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
                            *filename, fportion));
            } else if (fportion.size > 0) {
                sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
                            *filename, fportion));
            }
        }
    }
}

If prepareUDSourcesExecutor() is called without offsets, then it must decide how many portions to create.

The base case is to use one portion per source. However, if extra threads are available, the function divides the input into more portions so that a source can process them concurrently. Then prepareUDSourcesExecutor() calls prepareGeneratedPortions() to create the portions. This function begins by calling getLoadConcurrency() on the plan context to find out how many threads are available.

void prepareGeneratedPortions(ServerInterface &srvInterface,
                              ExecutorPlanContext &planCtxt,
                              std::vector<UDSource *> &sources,
                              std::map<std::string, Portion> initialPortions) {

  if ((ssize_t) initialPortions.size() >= planCtxt.getLoadConcurrency()) {
  /* all threads will be used, don't bother splitting into portions */

  for (std::map<std::string, Portion>::const_iterator file = initialPortions.begin();
       file != initialPortions.end(); ++file) {
    sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
            file->first, file->second));
       } // for
    return;
  } // if

  // Now we can split files to take advantage of potentially-unused threads.
  // First sort by size (descending), then we will split the largest first.

  // details elided...

}

For more information

See the source code for the full implementation of this example.

5 - Java example: FileSource

The example shown in this section is a simple UDL Source function named FileSource, This function loads data from files stored on the host's file system (similar to the standard COPY statement).

The example shown in this section is a simple UDL Source function named FileSource, This function loads data from files stored on the host's file system (similar to the standard COPY statement). To call FileSource, you must supply a parameter named file that contains the absolute path to one or more files on the host file system. You can specify multiple files as a comma-separated list.

The FileSource function also accepts an optional parameter, named nodes, that indicates which nodes should load the files. If you do not supply this parameter, the function defaults to loading data on the initiator node only. Because this example is simple, the nodes load only the files from their own file systems. Any files in the file parameter must exist on all of the hosts in the nodes parameter. The FileSource UDSource attempts to load all of the files in the file parameter on all of the hosts in the nodes parameter.

Generating files

You can use the following Python script to generate files and distribute them to hosts in your Vertica cluster. With these files, you can experiment with the example UDSource function. Running the function requires passwordless-SSH logins to copy the files to the other hosts. Therefore, you must run the script using the database administrator account on one of your database hosts.

#!/usr/bin/python
# Save this file as UDLDataGen.py
import string
import random
import sys
import os

# Read in the dictionary file to provide random words. Assumes the words
# file is located in /usr/share/dict/words
wordFile = open("/usr/share/dict/words")
wordDict = []
for line in wordFile:
    if len(line) > 6:
        wordDict.append(line.strip())

MAXSTR = 4 # Maximum number of words to concatentate
NUMROWS = 1000 # Number of rows of data to generate
#FILEPATH = '/tmp/UDLdata.txt' # Final filename to use for UDL source
TMPFILE = '/tmp/UDLtemp.txt'  # Temporary filename.

# Generate a random string by concatenating several words together. Max
# number of words set by MAXSTR
def randomWords():
    words = [random.choice(wordDict) for n in xrange(random.randint(1, MAXSTR))]
    sentence = " ".join(words)
    return sentence

# Create a temporary data file that will be moved to a node. Number of
# rows for the file is set by NUMROWS. Adds the name of the node which will
# get the file, to show which node loaded the data.
def generateFile(node):
    outFile = open(TMPFILE, 'w')
    for line in xrange(NUMROWS):
        outFile.write('{0}|{1}|{2}\n'.format(line,randomWords(),node))
    outFile.close()

# Copy the temporary file to a node. Only works if passwordless SSH login
# is enabled, which it is for the database administrator account on
# Vertica hosts.
def copyFile(fileName,node):
    os.system('scp "%s" "%s:%s"' % (TMPFILE, node, fileName) )

# Loop through the comma-separated list of nodes given in the first
# parameter, creating and copying data files whose full comma-separated
# paths are passed in the second parameter
for node in [x.strip() for x in sys.argv[1].split(',')]:
    for fileName in [y.strip() for y in sys.argv[2].split(',')]:
        print "generating file", fileName, "for", node
        generateFile(node)
        print "Copying file to",node
        copyFile(fileName,node)

You call this script by giving it a comma-separated list of hosts to receive the files and a comma-separated list of absolute paths of files to generate. For example:

$ python UDLDataGen.py v_vmart_node0001,v_vmart_node0002,v_vmart_node0003 /tmp/UDLdata01.txt,/tmp/UDLdata02.txt,UDLdata03.txt

This script generates files that contain a thousand rows of columns delimited with the pipe character (|). These columns contain an index value, a set of random words, and the node for which the file was generated, as shown in the following output sample:

0|megabits embanks|v_vmart_node0001
1|unneatly|v_vmart_node0001
2|self-precipitation|v_vmart_node0001
3|antihistamine scalados Vatter|v_vmart_node0001

Loading and using the example

Load and use the FileSource UDSource as follows:

=> --Load library and create the source function
=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaUDlLib.jar'
-> LANGUAGE 'JAVA';
CREATE LIBRARY
=> CREATE SOURCE File as LANGUAGE 'JAVA' NAME
-> 'com.mycompany.UDL.FileSourceFactory' LIBRARY JavaLib;
CREATE SOURCE FUNCTION
=> --Create a table to hold the data loaded from files
=> CREATE TABLE t (i integer, text VARCHAR, node VARCHAR);
CREATE TABLE
=> -- Copy a single file from the currently host using the FileSource
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt');
 Rows Loaded
-------------
        1000
(1 row)

=> --See some of what got loaded.
=> SELECT * FROM t WHERE i < 5 ORDER BY i;
 i |             text              |  node
---+-------------------------------+-----------------
 0 | megabits embanks              | v_vmart_node0001
 1 | unneatly                      | v_vmart_node0001
 2 | self-precipitation            | v_vmart_node0001
 3 | antihistamine scalados Vatter | v_vmart_node0001
 4 | fate-menaced toilworn         | v_vmart_node0001
(5 rows)



=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> -- Now load a file from three hosts. All of these hosts must have a file
=> -- named /tmp/UDLdata01.txt, each with different data
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt',
-> nodes='v_vmart_node0001,v_vmart_node0002,v_vmart_node0003');
 Rows Loaded
-------------
        3000
(1 row)

=> --Now see what has been loaded
=> SELECT * FROM t WHERE i < 5 ORDER BY i,node ;
 i |                      text                       |  node
---+-------------------------------------------------+--------
 0 | megabits embanks                                | v_vmart_node0001
 0 | nimble-eyed undupability frowsier               | v_vmart_node0002
 0 | Circean nonrepellence nonnasality               | v_vmart_node0003
 1 | unneatly                                        | v_vmart_node0001
 1 | floatmaker trabacolos hit-in                    | v_vmart_node0002
 1 | revelrous treatableness Halleck                 | v_vmart_node0003
 2 | self-precipitation                              | v_vmart_node0001
 2 | whipcords archipelagic protodonatan copycutter  | v_vmart_node0002
 2 | Paganalian geochemistry short-shucks            | v_vmart_node0003
 3 | antihistamine scalados Vatter                   | v_vmart_node0001
 3 | swordweed touristical subcommanders desalinized | v_vmart_node0002
 3 | batboys                                         | v_vmart_node0003
 4 | fate-menaced toilworn                           | v_vmart_node0001
 4 | twice-wanted cirrocumulous                      | v_vmart_node0002
 4 | doon-head-clock                                 | v_vmart_node0003
(15 rows)

=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> --Now copy from several files on several hosts
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt,/tmp/UDLdata02.txt,/tmp/UDLdata03.txt'
-> ,nodes='v_vmart_node0001,v_vmart_node0002,v_vmart_node0003');
 Rows Loaded
-------------
        9000
(1 row)

=> SELECT * FROM t WHERE i = 0 ORDER BY node ;
 i |                    text                     |  node
---+---------------------------------------------+--------
 0 | Awolowo Mirabilis D'Amboise                 | v_vmart_node0001
 0 | sortieing Divisionism selfhypnotization     | v_vmart_node0001
 0 | megabits embanks                            | v_vmart_node0001
 0 | nimble-eyed undupability frowsier           | v_vmart_node0002
 0 | thiaminase hieroglypher derogated soilborne | v_vmart_node0002
 0 | aurigraphy crocket stenocranial             | v_vmart_node0002
 0 | Khulna pelmets                              | v_vmart_node0003
 0 | Circean nonrepellence nonnasality           | v_vmart_node0003
 0 | matterate protarsal                         | v_vmart_node0003
(9 rows)

Parser implementation

The following code shows the source of the FileSource class that reads a file from the host file system. The constructor, which is called by FileSourceFactory.prepareUDSources(), gets the absolute path for the file containing the data to be read. The setup() method opens the file and the destroy() method closes it. The process() method reads from the file into a buffer provided by the instance of the DataBuffer class passed to it as a parameter. If the read operation filled the output buffer, it returns OUTPUT_NEEDED. This value tells Vertica to call the method again after the next stage of the load has processed the output buffer. If the read did not fill the output buffer, then process() returns DONE to indicate it has finished processing the data source.

package com.mycompany.UDL;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.RandomAccessFile;

import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.UDSource;
import com.vertica.sdk.UdfException;

public class FileSource extends UDSource {

    private String filename;  // The file for this UDSource to read
    private RandomAccessFile reader;   // handle to read from file


    // The constructor just stores the absolute filename of the file it will
    // read.
    public FileSource(String filename) {
        super();
        this.filename = filename;
    }

    // Called before Vertica starts requesting data from the data source.
    // In this case, setup needs to open the file and save to the reader
    // property.
    @Override
    public void setup(ServerInterface srvInterface ) throws UdfException{
        try {
            reader = new RandomAccessFile(new File(filename), "r");
        } catch (FileNotFoundException e) {
            // In case of any error, throw a UDfException. This will terminate
            // the data load.
             String msg = e.getMessage();
             throw new UdfException(0, msg);
        }
    }

    // Called after data has been loaded. In this case, close the file handle.
    @Override
    public void destroy(ServerInterface srvInterface ) throws UdfException {
        if (reader != null) {
            try {
                reader.close();
            } catch (IOException e) {
                String msg = e.getMessage();
                 throw new UdfException(0, msg);
            }
        }
    }

    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer output)
                                throws UdfException {

        // Read up to the size of the buffer provided in the DataBuffer.buf
        // property. Here we read directly from the file handle into the
        // buffer.
        long offset;
        try {
            offset = reader.read(output.buf,output.offset,
                                 output.buf.length-output.offset);
        } catch (IOException e) {
            // Throw an exception in case of any errors.
            String msg = e.getMessage();
            throw new UdfException(0, msg);
        }

        // Update the number of bytes processed so far by the data buffer.
        output.offset +=offset;

        // See end of data source has been reached, or less data was read
        // than can fit in the buffer
        if(offset == -1 || offset < output.buf.length) {
            // No more data to read.
            return StreamState.DONE;
        }else{
            // Tell Vertica to call again when buffer has been emptied
            return StreamState.OUTPUT_NEEDED;
        }
    }
}

Factory implementation

The following code is a modified version of the example Java UDsource function provided in the Java UDx support package. You can find the full example in /opt/vertica/sdk/examples/JavaUDx/UDLFuctions/com/vertica/JavaLibs/FileSourceFactory.java. Its override of the plan() method verifies that the user supplied the required file parameter. If the user also supplied the optional nodes parameter, this method verifies that the nodes exist in the Vertica cluster. If there is a problem with either parameter, the method throws an exception to return an error to the user. If there are no issues with the parameters, the plan() method stores their values in the plan context object.

package com.mycompany.UDL;

import java.util.ArrayList;
import java.util.Vector;
import com.vertica.sdk.NodeSpecifyingPlanContext;
import com.vertica.sdk.ParamReader;
import com.vertica.sdk.ParamWriter;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.SourceFactory;
import com.vertica.sdk.UDSource;
import com.vertica.sdk.UdfException;

public class FileSourceFactory extends SourceFactory {

    // Called once on the initiator host to do initial setup. Checks
    // parameters and chooses which nodes will do the work.
    @Override
    public void plan(ServerInterface srvInterface,
            NodeSpecifyingPlanContext planCtxt) throws UdfException {

        String nodes; // stores the list of nodes that will load data

        // Get  copy of the parameters the user supplied to the UDSource
        // function call.
        ParamReader args =  srvInterface.getParamReader();

        // A list of nodes that will perform work. This gets saved as part
        // of the plan context.
        ArrayList<String> executionNodes = new ArrayList<String>();

        // First, ensure the user supplied the file parameter
        if (!args.containsParameter("file")) {
            // Withut a file parameter, we cannot continue. Throw an
            // exception that will be caught by the Java UDx framework.
            throw new UdfException(0, "You must supply a file parameter");
        }

        // If the user specified nodes to read the file, parse the
        // comma-separated list and save. Otherwise, assume just the
        // Initiator node has the file to read.
        if (args.containsParameter("nodes")) {
            nodes = args.getString("nodes");

            // Get list of nodes in cluster, to ensure that the node the
            // user specified actually exists. The list of nodes is available
            // from the planCTxt (plan context) object,
            ArrayList<String> clusterNodes = planCtxt.getClusterNodes();

            // Parse the string parameter "nodes" which
            // is a comma-separated list of node names.
            String[] nodeNames = nodes.split(",");

            for (int i = 0; i < nodeNames.length; i++){
                // See if the node the user gave us actually exists
                if(clusterNodes.contains(nodeNames[i]))
                    // Node exists. Add it to list of nodes.
                    executionNodes.add(nodeNames[i]);
                else{
                    // User supplied node that doesn't exist. Throw an
                    // exception so the user is notified.
                    String msg = String.format("Specified node '%s' but no" +
                        " node by that name is available.  Available nodes "
                        + "are \"%s\".",
                        nodeNames[i], clusterNodes.toString());
                    throw new UdfException(0, msg);
                }
            }
        } else {
            // User did not supply a list of node names. Assume the initiator
            // is the only host that will read the file. The srvInterface
            // instance passed to this method has a getter for the current
            // node.
            executionNodes.add(srvInterface.getCurrentNodeName());
        }

        // Set the target node(s) in the plan context
        planCtxt.setTargetNodes(executionNodes);

        // Set parameters for each node reading data that tells it which
        // files it will read. In this simple example, just tell it to
        // read all of the files the user passed in the file parameter
        String files = args.getString("file");

        // Get object to write parameters into the plan context object.
        ParamWriter nodeParams = planCtxt.getWriter();

        // Loop through list of execution nodes, and add a parameter to plan
        // context named for each node performing the work, which tells it the
        // list of files it will process. Each node will look for a
        // parameter named something like "filesForv_vmart_node0002" in its
        // prepareUDSources() method.
        for (int i = 0; i < executionNodes.size(); i++) {
            nodeParams.setString("filesFor" + executionNodes.get(i), files);
        }
    }

    // Called on each host that is reading data from a source. This method
    // returns an array of UDSource objects that process each source.
    @Override
    public ArrayList<UDSource> prepareUDSources(ServerInterface srvInterface,
            NodeSpecifyingPlanContext planCtxt) throws UdfException {

        // An array to hold the UDSource subclasses that we instaniate
        ArrayList<UDSource> retVal = new ArrayList<UDSource>();

        // Get the list of files this node is supposed to process. This was
        // saved by the plan() method in the plancontext
        String myName = srvInterface.getCurrentNodeName();
        ParamReader params = planCtxt.getReader();
        String fileNames = params.getString("filesFor" + myName);

        // Note that you can also be lazy and directly grab the parameters
        // the user passed to the UDSource functon in the COPY statement directly
        // by getting parameters from the ServerInterface object. I.e.:

        //String fileNames = srvInterface.getParamReader().getString("file");

        // Split comma-separated list into a single list.
        String[] fileList = fileNames.split(",");
        for (int i = 0; i < fileList.length; i++){
            // Instantiate a FileSource object (which is a subclass of UDSource)
            // to read each file. The constructor for FileSource takes the
            // file name of the
            retVal.add(new FileSource(fileList[i]));
        }

        // Return the collection of FileSource objects. They will be called,
        // in turn, to read each of the files.
        return retVal;
    }

    // Declares which parameters that this factory accepts.
    @Override
    public void getParameterType(ServerInterface srvInterface,
                                    SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(65000, "file");
        parameterTypes.addVarchar(65000, "nodes");
    }
}