Java 示例: FileSource

本节中所示的示例是一个名为 FileSource 的简单 UDL 源函数,此函数可加载存储在主机文件系统上的文件中的数据(类似于标准 COPY 语句)。要调用 FileSource,您必须提供一个名为 file 的参数,并且该参数必须包含主机文件系统上的一个或多个文件的绝对路径。您可以使用逗号分隔列表格式指定多个文件。

FileSource 函数还接受一个名为 nodes 的可选参数,此可选参数指示哪些节点应加载文件。如果未提供此参数,则在默认情况下,函数仅在启动程序节点上加载数据。由于此示例是一个简单示例,因此节点仅加载自己的文件系统中的文件。file 参数中的任何文件必须存在于 nodes 参数中的所有主机上。FileSource UDSource 会尝试将 file 参数中的所有文件加载到 nodes 参数中的所有主机上。


可以使用以下 Python 脚本生成文件并将这些文件分发给 Vertica 群集中的主机。使用这些文件,您可以试验示例 UDSource 函数。要运行此函数,您必须能够进行无密码 SSH 登录,以将文件复制到其他主机。因此,您必须使用数据库管理员帐户在数据库主机之一上运行该脚本。

# Save this file as
import string
import random
import sys
import os

# Read in the dictionary file to provide random words. Assumes the words
# file is located in /usr/share/dict/words
wordFile = open("/usr/share/dict/words")
wordDict = []
for line in wordFile:
    if len(line) > 6:

MAXSTR = 4 # Maximum number of words to concatentate
NUMROWS = 1000 # Number of rows of data to generate
#FILEPATH = '/tmp/UDLdata.txt' # Final filename to use for UDL source
TMPFILE = '/tmp/UDLtemp.txt'  # Temporary filename.

# Generate a random string by concatenating several words together. Max
# number of words set by MAXSTR
def randomWords():
    words = [random.choice(wordDict) for n in xrange(random.randint(1, MAXSTR))]
    sentence = " ".join(words)
    return sentence

# Create a temporary data file that will be moved to a node. Number of
# rows for the file is set by NUMROWS. Adds the name of the node which will
# get the file, to show which node loaded the data.
def generateFile(node):
    outFile = open(TMPFILE, 'w')
    for line in xrange(NUMROWS):

# Copy the temporary file to a node. Only works if passwordless SSH login
# is enabled, which it is for the database administrator account on
# Vertica hosts.
def copyFile(fileName,node):
    os.system('scp "%s" "%s:%s"' % (TMPFILE, node, fileName) )

# Loop through the comma-separated list of nodes given in the first
# parameter, creating and copying data files whose full comma-separated
# paths are passed in the second parameter
for node in [x.strip() for x in sys.argv[1].split(',')]:
    for fileName in [y.strip() for y in sys.argv[2].split(',')]:
        print "generating file", fileName, "for", node
        print "Copying file to",node


python v_vmart_node0001,v_vmart_node0002,v_vmart_node0003 /tmp/UDLdata01.txt,/tmp/UDLdata02.txt,\

该脚本将生成包含一千个行(各列用管道字符 (|) 分隔)的文件。这些列包含一个索引值、一组随机词以及为其生成了文件的节点,如以下输出示例所示:

0|megabits embanks|v_vmart_node0001
3|antihistamine scalados Vatter|v_vmart_node0001


按如下所示加载并使用 FileSource UDSource:

=> --Load library and create the source function
=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaUDlLib.jar'
-> 'com.mycompany.UDL.FileSourceFactory' LIBRARY JavaLib;
=> --Create a table to hold the data loaded from files
=> CREATE TABLE t (i integer, text VARCHAR, node VARCHAR);
=> -- Copy a single file from the currently host using the FileSource
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt');
 Rows Loaded
(1 row)

=> --See some of what got loaded.
 i |             text              |  node
 0 | megabits embanks              | v_vmart_node0001
 1 | unneatly                      | v_vmart_node0001
 2 | self-precipitation            | v_vmart_node0001
 3 | antihistamine scalados Vatter | v_vmart_node0001
 4 | fate-menaced toilworn         | v_vmart_node0001
(5 rows)

=> -- Now load a file from three hosts. All of these hosts must have a file
=> -- named /tmp/UDLdata01.txt, each with different data
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt',
-> nodes='v_vmart_node0001,v_vmart_node0002,v_vmart_node0003');
 Rows Loaded
(1 row)

=> --Now see what has been loaded
=> SELECT * FROM t WHERE i < 5 ORDER BY i,node ;
 i |                      text                       |  node
 0 | megabits embanks                                | v_vmart_node0001
 0 | nimble-eyed undupability frowsier               | v_vmart_node0002
 0 | Circean nonrepellence nonnasality               | v_vmart_node0003
 1 | unneatly                                        | v_vmart_node0001
 1 | floatmaker trabacolos hit-in                    | v_vmart_node0002
 1 | revelrous treatableness Halleck                 | v_vmart_node0003
 2 | self-precipitation                              | v_vmart_node0001
 2 | whipcords archipelagic protodonatan copycutter  | v_vmart_node0002
 2 | Paganalian geochemistry short-shucks            | v_vmart_node0003
 3 | antihistamine scalados Vatter                   | v_vmart_node0001
 3 | swordweed touristical subcommanders desalinized | v_vmart_node0002
 3 | batboys                                         | v_vmart_node0003
 4 | fate-menaced toilworn                           | v_vmart_node0001
 4 | twice-wanted cirrocumulous                      | v_vmart_node0002
 4 | doon-head-clock                                 | v_vmart_node0003
(15 rows)

=> --Now copy from several files on several hosts
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt,/tmp/UDLdata02.txt,/tmp/UDLdata03.txt'
-> ,nodes='v_vmart_node0001,v_vmart_node0002,v_vmart_node0003');
 Rows Loaded
(1 row)

=> SELECT * FROM t WHERE i = 0 ORDER BY node ;
 i |                    text                     |  node
 0 | Awolowo Mirabilis D'Amboise                 | v_vmart_node0001
 0 | sortieing Divisionism selfhypnotization     | v_vmart_node0001
 0 | megabits embanks                            | v_vmart_node0001
 0 | nimble-eyed undupability frowsier           | v_vmart_node0002
 0 | thiaminase hieroglypher derogated soilborne | v_vmart_node0002
 0 | aurigraphy crocket stenocranial             | v_vmart_node0002
 0 | Khulna pelmets                              | v_vmart_node0003
 0 | Circean nonrepellence nonnasality           | v_vmart_node0003
 0 | matterate protarsal                         | v_vmart_node0003
(9 rows)


以下代码显示了从主机文件系统读取文件的 FileSource 类的源。FileSourceFactory.prepareUDSources() 所调用的构造函数可获取包含要读取的数据的文件的绝对文件。setup() 方法可打开文件,而 destroy() 方法可关闭文件。process() 方法可读取文件中的数据并将数据写入到缓冲区(由作为参数传递的 DataBuffer 类的实例提供)。如果读取操作已将输出缓冲区填满,则该方法将返回 OUTPUT_NEEDED。此值可指示 Vertica 在加载的下一个阶段已处理输出缓冲区之后再次调用该方法。如果读取操作未将输出缓冲区填满,则 process() 将返回 DONE,以指示已完成对数据源的处理。

package com.mycompany.UDL;


import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.UDSource;
import com.vertica.sdk.UdfException;

public class FileSource extends UDSource {

    private String filename;  // The file for this UDSource to read
    private RandomAccessFile reader;   // handle to read from file

    // The constructor just stores the absolute filename of the file it will
    // read.
    public FileSource(String filename) {
        this.filename = filename;

    // Called before Vertica starts requesting data from the data source.
    // In this case, setup needs to open the file and save to the reader
    // property.
    public void setup(ServerInterface srvInterface ) throws UdfException{
        try {
            reader = new RandomAccessFile(new File(filename), "r");
        } catch (FileNotFoundException e) {
            // In case of any error, throw a UDfException. This will terminate
            // the data load.
             String msg = e.getMessage();
             throw new UdfException(0, msg);

    // Called after data has been loaded. In this case, close the file handle.
    public void destroy(ServerInterface srvInterface ) throws UdfException {
        if (reader != null) {
            try {
            } catch (IOException e) {
                String msg = e.getMessage();
                 throw new UdfException(0, msg);

    public StreamState process(ServerInterface srvInterface, DataBuffer output)
                                throws UdfException {

        // Read up to the size of the buffer provided in the DataBuffer.buf
        // property. Here we read directly from the file handle into the
        // buffer.
        long offset;
        try {
            offset =,output.offset,
        } catch (IOException e) {
            // Throw an exception in case of any errors.
            String msg = e.getMessage();
            throw new UdfException(0, msg);

        // Update the number of bytes processed so far by the data buffer.
        output.offset +=offset;

        // See end of data source has been reached, or less data was read
        // than can fit in the buffer
        if(offset == -1 || offset < output.buf.length) {
            // No more data to read.
            return StreamState.DONE;
            // Tell Vertica to call again when buffer has been emptied
            return StreamState.OUTPUT_NEEDED;


以下代码是 Java UDx 支持包中提供的示例 Java UDsource 函数的修改版本。可以在 /opt/vertica/sdk/examples/JavaUDx/UDLFuctions/com/vertica/JavaLibs/ 中找到完整示例。通过覆盖 plan() 方法,可验证用户是否提供了必需的 file 参数。如果用户还提供了可选的 nodes 参数,该方法还可以验证节点是否存在于 Vertica 群集中。如果任一参数存在问题,该方法将抛出异常并向用户返回错误。如果两个参数都没有问题,则 plan() 方法会将其值存储在计划上下文对象中。

package com.mycompany.UDL;

import java.util.ArrayList;
import java.util.Vector;
import com.vertica.sdk.NodeSpecifyingPlanContext;
import com.vertica.sdk.ParamReader;
import com.vertica.sdk.ParamWriter;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.SourceFactory;
import com.vertica.sdk.UDSource;
import com.vertica.sdk.UdfException;

public class FileSourceFactory extends SourceFactory {

    // Called once on the initiator host to do initial setup. Checks
    // parameters and chooses which nodes will do the work.
    public void plan(ServerInterface srvInterface,
            NodeSpecifyingPlanContext planCtxt) throws UdfException {

        String nodes; // stores the list of nodes that will load data

        // Get  copy of the parameters the user supplied to the UDSource
        // function call.
        ParamReader args =  srvInterface.getParamReader();

        // A list of nodes that will perform work. This gets saved as part
        // of the plan context.
        ArrayList<String> executionNodes = new ArrayList<String>();

        // First, ensure the user supplied the file parameter
        if (!args.containsParameter("file")) {
            // Withut a file parameter, we cannot continue. Throw an
            // exception that will be caught by the Java UDx framework.
            throw new UdfException(0, "You must supply a file parameter");

        // If the user specified nodes to read the file, parse the
        // comma-separated list and save. Otherwise, assume just the
        // Initiator node has the file to read.
        if (args.containsParameter("nodes")) {
            nodes = args.getString("nodes");

            // Get list of nodes in cluster, to ensure that the node the
            // user specified actually exists. The list of nodes is available
            // from the planCTxt (plan context) object,
            ArrayList<String> clusterNodes = planCtxt.getClusterNodes();

            // Parse the string parameter "nodes" which
            // is a comma-separated list of node names.
            String[] nodeNames = nodes.split(",");

            for (int i = 0; i < nodeNames.length; i++){
                // See if the node the user gave us actually exists
                    // Node exists. Add it to list of nodes.
                    // User supplied node that doesn't exist. Throw an
                    // exception so the user is notified.
                    String msg = String.format("Specified node '%s' but no" +
                        " node by that name is available.  Available nodes "
                        + "are \"%s\".",
                        nodeNames[i], clusterNodes.toString());
                    throw new UdfException(0, msg);
        } else {
            // User did not supply a list of node names. Assume the initiator
            // is the only host that will read the file. The srvInterface
            // instance passed to this method has a getter for the current
            // node.

        // Set the target node(s) in the plan context

        // Set parameters for each node reading data that tells it which
        // files it will read. In this simple example, just tell it to
        // read all of the files the user passed in the file parameter
        String files = args.getString("file");

        // Get object to write parameters into the plan context object.
        ParamWriter nodeParams = planCtxt.getWriter();

        // Loop through list of execution nodes, and add a parameter to plan
        // context named for each node performing the work, which tells it the
        // list of files it will process. Each node will look for a
        // parameter named something like "filesForv_vmart_node0002" in its
        // prepareUDSources() method.
        for (int i = 0; i < executionNodes.size(); i++) {
            nodeParams.setString("filesFor" + executionNodes.get(i), files);

    // Called on each host that is reading data from a source. This method
    // returns an array of UDSource objects that process each source.
    public ArrayList<UDSource> prepareUDSources(ServerInterface srvInterface,
            NodeSpecifyingPlanContext planCtxt) throws UdfException {

        // An array to hold the UDSource subclasses that we instaniate
        ArrayList<UDSource> retVal = new ArrayList<UDSource>();

        // Get the list of files this node is supposed to process. This was
        // saved by the plan() method in the plancontext
        String myName = srvInterface.getCurrentNodeName();
        ParamReader params = planCtxt.getReader();
        String fileNames = params.getString("filesFor" + myName);

        // Note that you can also be lazy and directly grab the parameters
        // the user passed to the UDSource functon in the COPY statement directly
        // by getting parameters from the ServerInterface object. I.e.:

        //String fileNames = srvInterface.getParamReader().getString("file");

        // Split comma-separated list into a single list.
        String[] fileList = fileNames.split(",");
        for (int i = 0; i < fileList.length; i++){
            // Instantiate a FileSource object (which is a subclass of UDSource)
            // to read each file. The constructor for FileSource takes the
            // file name of the
            retVal.add(new FileSource(fileList[i]));

        // Return the collection of FileSource objects. They will be called,
        // in turn, to read each of the files.
        return retVal;

    // Declares which parameters that this factory accepts.
    public void getParameterType(ServerInterface srvInterface,
                                    SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(65000, "file");
        parameterTypes.addVarchar(65000, "nodes");