Java example: FileSource

The example shown in this section is a simple UDL Source function named FileSource, This function loads data from files stored on the host's file system (similar to the standard COPY statement).

The example shown in this section is a simple UDL Source function named FileSource, This function loads data from files stored on the host's file system (similar to the standard COPY statement). To call FileSource, you must supply a parameter named file that contains the absolute path to one or more files on the host file system. You can specify multiple files as a comma-separated list.

The FileSource function also accepts an optional parameter, named nodes, that indicates which nodes should load the files. If you do not supply this parameter, the function defaults to loading data on the initiator node only. Because this example is simple, the nodes load only the files from their own file systems. Any files in the file parameter must exist on all of the hosts in the nodes parameter. The FileSource UDSource attempts to load all of the files in the file parameter on all of the hosts in the nodes parameter.

Generating files

You can use the following Python script to generate files and distribute them to hosts in your Vertica cluster. With these files, you can experiment with the example UDSource function. Running the function requires passwordless-SSH logins to copy the files to the other hosts. Therefore, you must run the script using the database administrator account on one of your database hosts.

#!/usr/bin/python
# Save this file as UDLDataGen.py
import string
import random
import sys
import os

# Read in the dictionary file to provide random words. Assumes the words
# file is located in /usr/share/dict/words
wordFile = open("/usr/share/dict/words")
wordDict = []
for line in wordFile:
    if len(line) > 6:
        wordDict.append(line.strip())

MAXSTR = 4 # Maximum number of words to concatentate
NUMROWS = 1000 # Number of rows of data to generate
#FILEPATH = '/tmp/UDLdata.txt' # Final filename to use for UDL source
TMPFILE = '/tmp/UDLtemp.txt'  # Temporary filename.

# Generate a random string by concatenating several words together. Max
# number of words set by MAXSTR
def randomWords():
    words = [random.choice(wordDict) for n in xrange(random.randint(1, MAXSTR))]
    sentence = " ".join(words)
    return sentence

# Create a temporary data file that will be moved to a node. Number of
# rows for the file is set by NUMROWS. Adds the name of the node which will
# get the file, to show which node loaded the data.
def generateFile(node):
    outFile = open(TMPFILE, 'w')
    for line in xrange(NUMROWS):
        outFile.write('{0}|{1}|{2}\n'.format(line,randomWords(),node))
    outFile.close()

# Copy the temporary file to a node. Only works if passwordless SSH login
# is enabled, which it is for the database administrator account on
# Vertica hosts.
def copyFile(fileName,node):
    os.system('scp "%s" "%s:%s"' % (TMPFILE, node, fileName) )

# Loop through the comma-separated list of nodes given in the first
# parameter, creating and copying data files whose full comma-separated
# paths are passed in the second parameter
for node in [x.strip() for x in sys.argv[1].split(',')]:
    for fileName in [y.strip() for y in sys.argv[2].split(',')]:
        print "generating file", fileName, "for", node
        generateFile(node)
        print "Copying file to",node
        copyFile(fileName,node)

You call this script by giving it a comma-separated list of hosts to receive the files and a comma-separated list of absolute paths of files to generate. For example:

$ python UDLDataGen.py v_vmart_node0001,v_vmart_node0002,v_vmart_node0003 /tmp/UDLdata01.txt,/tmp/UDLdata02.txt,UDLdata03.txt

This script generates files that contain a thousand rows of columns delimited with the pipe character (|). These columns contain an index value, a set of random words, and the node for which the file was generated, as shown in the following output sample:

0|megabits embanks|v_vmart_node0001
1|unneatly|v_vmart_node0001
2|self-precipitation|v_vmart_node0001
3|antihistamine scalados Vatter|v_vmart_node0001

Loading and using the example

Load and use the FileSource UDSource as follows:

=> --Load library and create the source function
=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaUDlLib.jar'
-> LANGUAGE 'JAVA';
CREATE LIBRARY
=> CREATE SOURCE File as LANGUAGE 'JAVA' NAME
-> 'com.mycompany.UDL.FileSourceFactory' LIBRARY JavaLib;
CREATE SOURCE FUNCTION
=> --Create a table to hold the data loaded from files
=> CREATE TABLE t (i integer, text VARCHAR, node VARCHAR);
CREATE TABLE
=> -- Copy a single file from the currently host using the FileSource
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt');
 Rows Loaded
-------------
        1000
(1 row)

=> --See some of what got loaded.
=> SELECT * FROM t WHERE i < 5 ORDER BY i;
 i |             text              |  node
---+-------------------------------+-----------------
 0 | megabits embanks              | v_vmart_node0001
 1 | unneatly                      | v_vmart_node0001
 2 | self-precipitation            | v_vmart_node0001
 3 | antihistamine scalados Vatter | v_vmart_node0001
 4 | fate-menaced toilworn         | v_vmart_node0001
(5 rows)



=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> -- Now load a file from three hosts. All of these hosts must have a file
=> -- named /tmp/UDLdata01.txt, each with different data
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt',
-> nodes='v_vmart_node0001,v_vmart_node0002,v_vmart_node0003');
 Rows Loaded
-------------
        3000
(1 row)

=> --Now see what has been loaded
=> SELECT * FROM t WHERE i < 5 ORDER BY i,node ;
 i |                      text                       |  node
---+-------------------------------------------------+--------
 0 | megabits embanks                                | v_vmart_node0001
 0 | nimble-eyed undupability frowsier               | v_vmart_node0002
 0 | Circean nonrepellence nonnasality               | v_vmart_node0003
 1 | unneatly                                        | v_vmart_node0001
 1 | floatmaker trabacolos hit-in                    | v_vmart_node0002
 1 | revelrous treatableness Halleck                 | v_vmart_node0003
 2 | self-precipitation                              | v_vmart_node0001
 2 | whipcords archipelagic protodonatan copycutter  | v_vmart_node0002
 2 | Paganalian geochemistry short-shucks            | v_vmart_node0003
 3 | antihistamine scalados Vatter                   | v_vmart_node0001
 3 | swordweed touristical subcommanders desalinized | v_vmart_node0002
 3 | batboys                                         | v_vmart_node0003
 4 | fate-menaced toilworn                           | v_vmart_node0001
 4 | twice-wanted cirrocumulous                      | v_vmart_node0002
 4 | doon-head-clock                                 | v_vmart_node0003
(15 rows)

=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> --Now copy from several files on several hosts
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt,/tmp/UDLdata02.txt,/tmp/UDLdata03.txt'
-> ,nodes='v_vmart_node0001,v_vmart_node0002,v_vmart_node0003');
 Rows Loaded
-------------
        9000
(1 row)

=> SELECT * FROM t WHERE i = 0 ORDER BY node ;
 i |                    text                     |  node
---+---------------------------------------------+--------
 0 | Awolowo Mirabilis D'Amboise                 | v_vmart_node0001
 0 | sortieing Divisionism selfhypnotization     | v_vmart_node0001
 0 | megabits embanks                            | v_vmart_node0001
 0 | nimble-eyed undupability frowsier           | v_vmart_node0002
 0 | thiaminase hieroglypher derogated soilborne | v_vmart_node0002
 0 | aurigraphy crocket stenocranial             | v_vmart_node0002
 0 | Khulna pelmets                              | v_vmart_node0003
 0 | Circean nonrepellence nonnasality           | v_vmart_node0003
 0 | matterate protarsal                         | v_vmart_node0003
(9 rows)

Parser implementation

The following code shows the source of the FileSource class that reads a file from the host file system. The constructor, which is called by FileSourceFactory.prepareUDSources(), gets the absolute path for the file containing the data to be read. The setup() method opens the file and the destroy() method closes it. The process() method reads from the file into a buffer provided by the instance of the DataBuffer class passed to it as a parameter. If the read operation filled the output buffer, it returns OUTPUT_NEEDED. This value tells Vertica to call the method again after the next stage of the load has processed the output buffer. If the read did not fill the output buffer, then process() returns DONE to indicate it has finished processing the data source.

package com.mycompany.UDL;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.RandomAccessFile;

import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.UDSource;
import com.vertica.sdk.UdfException;

public class FileSource extends UDSource {

    private String filename;  // The file for this UDSource to read
    private RandomAccessFile reader;   // handle to read from file


    // The constructor just stores the absolute filename of the file it will
    // read.
    public FileSource(String filename) {
        super();
        this.filename = filename;
    }

    // Called before Vertica starts requesting data from the data source.
    // In this case, setup needs to open the file and save to the reader
    // property.
    @Override
    public void setup(ServerInterface srvInterface ) throws UdfException{
        try {
            reader = new RandomAccessFile(new File(filename), "r");
        } catch (FileNotFoundException e) {
            // In case of any error, throw a UDfException. This will terminate
            // the data load.
             String msg = e.getMessage();
             throw new UdfException(0, msg);
        }
    }

    // Called after data has been loaded. In this case, close the file handle.
    @Override
    public void destroy(ServerInterface srvInterface ) throws UdfException {
        if (reader != null) {
            try {
                reader.close();
            } catch (IOException e) {
                String msg = e.getMessage();
                 throw new UdfException(0, msg);
            }
        }
    }

    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer output)
                                throws UdfException {

        // Read up to the size of the buffer provided in the DataBuffer.buf
        // property. Here we read directly from the file handle into the
        // buffer.
        long offset;
        try {
            offset = reader.read(output.buf,output.offset,
                                 output.buf.length-output.offset);
        } catch (IOException e) {
            // Throw an exception in case of any errors.
            String msg = e.getMessage();
            throw new UdfException(0, msg);
        }

        // Update the number of bytes processed so far by the data buffer.
        output.offset +=offset;

        // See end of data source has been reached, or less data was read
        // than can fit in the buffer
        if(offset == -1 || offset < output.buf.length) {
            // No more data to read.
            return StreamState.DONE;
        }else{
            // Tell Vertica to call again when buffer has been emptied
            return StreamState.OUTPUT_NEEDED;
        }
    }
}

Factory implementation

The following code is a modified version of the example Java UDsource function provided in the Java UDx support package. You can find the full example in /opt/vertica/sdk/examples/JavaUDx/UDLFuctions/com/vertica/JavaLibs/FileSourceFactory.java. Its override of the plan() method verifies that the user supplied the required file parameter. If the user also supplied the optional nodes parameter, this method verifies that the nodes exist in the Vertica cluster. If there is a problem with either parameter, the method throws an exception to return an error to the user. If there are no issues with the parameters, the plan() method stores their values in the plan context object.

package com.mycompany.UDL;

import java.util.ArrayList;
import java.util.Vector;
import com.vertica.sdk.NodeSpecifyingPlanContext;
import com.vertica.sdk.ParamReader;
import com.vertica.sdk.ParamWriter;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.SourceFactory;
import com.vertica.sdk.UDSource;
import com.vertica.sdk.UdfException;

public class FileSourceFactory extends SourceFactory {

    // Called once on the initiator host to do initial setup. Checks
    // parameters and chooses which nodes will do the work.
    @Override
    public void plan(ServerInterface srvInterface,
            NodeSpecifyingPlanContext planCtxt) throws UdfException {

        String nodes; // stores the list of nodes that will load data

        // Get  copy of the parameters the user supplied to the UDSource
        // function call.
        ParamReader args =  srvInterface.getParamReader();

        // A list of nodes that will perform work. This gets saved as part
        // of the plan context.
        ArrayList<String> executionNodes = new ArrayList<String>();

        // First, ensure the user supplied the file parameter
        if (!args.containsParameter("file")) {
            // Withut a file parameter, we cannot continue. Throw an
            // exception that will be caught by the Java UDx framework.
            throw new UdfException(0, "You must supply a file parameter");
        }

        // If the user specified nodes to read the file, parse the
        // comma-separated list and save. Otherwise, assume just the
        // Initiator node has the file to read.
        if (args.containsParameter("nodes")) {
            nodes = args.getString("nodes");

            // Get list of nodes in cluster, to ensure that the node the
            // user specified actually exists. The list of nodes is available
            // from the planCTxt (plan context) object,
            ArrayList<String> clusterNodes = planCtxt.getClusterNodes();

            // Parse the string parameter "nodes" which
            // is a comma-separated list of node names.
            String[] nodeNames = nodes.split(",");

            for (int i = 0; i < nodeNames.length; i++){
                // See if the node the user gave us actually exists
                if(clusterNodes.contains(nodeNames[i]))
                    // Node exists. Add it to list of nodes.
                    executionNodes.add(nodeNames[i]);
                else{
                    // User supplied node that doesn't exist. Throw an
                    // exception so the user is notified.
                    String msg = String.format("Specified node '%s' but no" +
                        " node by that name is available.  Available nodes "
                        + "are \"%s\".",
                        nodeNames[i], clusterNodes.toString());
                    throw new UdfException(0, msg);
                }
            }
        } else {
            // User did not supply a list of node names. Assume the initiator
            // is the only host that will read the file. The srvInterface
            // instance passed to this method has a getter for the current
            // node.
            executionNodes.add(srvInterface.getCurrentNodeName());
        }

        // Set the target node(s) in the plan context
        planCtxt.setTargetNodes(executionNodes);

        // Set parameters for each node reading data that tells it which
        // files it will read. In this simple example, just tell it to
        // read all of the files the user passed in the file parameter
        String files = args.getString("file");

        // Get object to write parameters into the plan context object.
        ParamWriter nodeParams = planCtxt.getWriter();

        // Loop through list of execution nodes, and add a parameter to plan
        // context named for each node performing the work, which tells it the
        // list of files it will process. Each node will look for a
        // parameter named something like "filesForv_vmart_node0002" in its
        // prepareUDSources() method.
        for (int i = 0; i < executionNodes.size(); i++) {
            nodeParams.setString("filesFor" + executionNodes.get(i), files);
        }
    }

    // Called on each host that is reading data from a source. This method
    // returns an array of UDSource objects that process each source.
    @Override
    public ArrayList<UDSource> prepareUDSources(ServerInterface srvInterface,
            NodeSpecifyingPlanContext planCtxt) throws UdfException {

        // An array to hold the UDSource subclasses that we instaniate
        ArrayList<UDSource> retVal = new ArrayList<UDSource>();

        // Get the list of files this node is supposed to process. This was
        // saved by the plan() method in the plancontext
        String myName = srvInterface.getCurrentNodeName();
        ParamReader params = planCtxt.getReader();
        String fileNames = params.getString("filesFor" + myName);

        // Note that you can also be lazy and directly grab the parameters
        // the user passed to the UDSource functon in the COPY statement directly
        // by getting parameters from the ServerInterface object. I.e.:

        //String fileNames = srvInterface.getParamReader().getString("file");

        // Split comma-separated list into a single list.
        String[] fileList = fileNames.split(",");
        for (int i = 0; i < fileList.length; i++){
            // Instantiate a FileSource object (which is a subclass of UDSource)
            // to read each file. The constructor for FileSource takes the
            // file name of the
            retVal.add(new FileSource(fileList[i]));
        }

        // Return the collection of FileSource objects. They will be called,
        // in turn, to read each of the files.
        return retVal;
    }

    // Declares which parameters that this factory accepts.
    @Override
    public void getParameterType(ServerInterface srvInterface,
                                    SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(65000, "file");
        parameterTypes.addVarchar(65000, "nodes");
    }
}