This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Transform functions (UDTFs)

A user-defined transform function (UDTF) lets you transform a table of data into another table.

A user-defined transform function (UDTF) lets you transform a table of data into another table. It reads one or more arguments (treated as a row of data), and returns zero or more rows of data consisting of one or more columns. A UDTF can produce any number of rows as output. However, each row it outputs must be complete. Advancing to the next row without having added a value for each column produces incorrect results.

The schema of the output table does not need to correspond to the schema of the input table—they can be totally different. The UDTF can return any number of output rows for each row of input.

UDTFs can only be used in the SELECT list that contains just the UDTF call and a required OVER clause. A multi-phase UDTF can make use of partition columns (PARTITION BY), but other UDTFs cannot.

UDTFs are run after GROUP BY, but before the final ORDER BY, when used in conjunction with GROUP BY and ORDER BY in a statement. The ORDER BY clause may contain only columns or expressions that are in a window partition clause (see Window partitioning).

UDTFs can take up to 9800 parameters (input columns). Attempts to pass more parameters to a UDTF return an error.

1 - TransformFunction class

The TransformFunction class is where you perform the data-processing, transforming input rows into output rows.

The TransformFunction class is where you perform the data-processing, transforming input rows into output rows. Your subclass must define the processPartition() method. It may define methods to set up and tear down the function.

Performing the transformation

The processPartition() method carries out all of the processing that you want your UDTF to perform. When a user calls your function in a SQL statement, Vertica bundles together the data from the function parameters and passes it to processPartition() .

The input and output of the processPartition() method are supplied by objects of the PartitionReader and PartitionWriter classes. They define methods that you use to read the input data and write the output data for your UDTF.

A UDTF does not necessarily operate on a single row the way a UDSF does. A UDTF can read any number of rows and write output at any time.

Consider the following guidelines when implementing processPartition():

  • Extract the input parameters by calling data-type-specific functions in the PartitionReader object to extract each input parameter. Each of these functions takes a single parameter: the column number in the input row that you want to read. Your function might need to handle NULL values.

  • When writing output, your UDTF must supply values for all of the output columns you defined in your factory. Similarly to reading input columns, the PartitionWriter object has functions for writing each type of data to the output row.

  • Use PartitionReader.next() to determine if there is more input to process, and exit when the input is exhausted.

  • In some cases, you might want to determine the number and types of parameters using PartitionReader's getNumCols() and getTypeMetaData() functions, instead of just hard-coding the data types of the columns in the input row. This is useful if you want your TransformFunction to be able to process input tables with different schemas. You can then use different TransformFunctionFactory classes to define multiple function signatures that call the same TransformFunction class. See Overloading your UDx for more information.

Setting up and tearing down

The TransformFunction class defines two additional methods that you can optionally implement to allocate and free resources: setup() and destroy(). You should use these methods to allocate and deallocate resources that you do not allocate through the UDx API (see Allocating resources for UDxs for details).

API

The TransformFunction API provides the following methods for extension by subclasses:

virtual void setup(ServerInterface &srvInterface,
        const SizedColumnTypes &argTypes);

virtual void processPartition(ServerInterface &srvInterface,
        PartitionReader &input_reader, PartitionWriter &output_writer)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface,
        const SizedColumnTypes &argTypes);

The PartitionReader and PartitionWriter classes provide getters and setters for column values, along with next() to iterate through partitions. See the API reference documentation for details.

The TransformFunction API provides the following methods for extension by subclasses:

public void setup(ServerInterface srvInterface, SizedColumnTypes argTypes);

public abstract void processPartition(ServerInterface srvInterface,
        PartitionReader input_reader, PartitionWriter input_writer)
    throws UdfException, DestroyInvocation;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface, SizedColumnTypes argTypes);

The PartitionReader and PartitionWriter classes provide getters and setters for column values, along with next() to iterate through partitions. See the API reference documentation for details.

The TransformFunction API provides the following methods for extension by subclasses:


def setup(self, server_interface, col_types)

def processPartition(self, server_interface, partition_reader, partition_writer)

def destroy(self, server_interface, col_types)

The PartitionReader and PartitionWriter classes provide getters and setters for column values, along with next() to iterate through partitions. See the API reference documentation for details.

Implement the Main function API to define a transform function:

FunctionName <- function(input.data.frame, parameters.data.frame) {
  # Computations

  # The function must return a data frame.
  return(output.data.frame)
}

2 - TransformFunctionFactory class

The TransformFunctionFactory class tells Vertica metadata about your UDTF: its number of parameters and their data types, as well as the data type of its return value.

The TransformFunctionFactory class tells Vertica metadata about your UDTF: its number of parameters and their data types, as well as the data type of its return value. It also instantiates a subclass of TransformFunction.

You must implement the following methods in your TransformFunctionFactory:

  • getPrototype() returns two ColumnTypes objects that describe the columns your UDTF takes as input and returns as output.

  • getReturnType() tells Vertica details about the output values: the width of variable-sized data types (such as VARCHAR) and the precision of data types that have settable precision (such as TIMESTAMP). You can also set the names of the output columns using this function. While this method is optional for UDxs that return single values, you must implement it for UDTFs.

  • createTransformFunction() instantiates your TransformFunction subclass.

For transform functions written in C++, you can provide information that can help with query optimization. See Improving query performance (C++ only).

API

The TransformFunctionFactory API provides the following methods for extension by subclasses:

virtual TransformFunction *
    createTransformFunction (ServerInterface &srvInterface)=0;

virtual void getPrototype(ServerInterface &srvInterface,
            ColumnTypes &argTypes, ColumnTypes &returnType)=0;

virtual void getReturnType(ServerInterface &srvInterface,
            const SizedColumnTypes &argTypes,
            SizedColumnTypes &returnType)=0;

virtual void getParameterType(ServerInterface &srvInterface,
            SizedColumnTypes &parameterTypes);

The TransformFunctionFactory API provides the following methods for extension by subclasses:

public abstract TransformFunction createTransformFunction(ServerInterface srvInterface);

public abstract void getPrototype(ServerInterface srvInterface, ColumnTypes argTypes, ColumnTypes returnType);

public abstract void getReturnType(ServerInterface srvInterface, SizedColumnTypes argTypes,
        SizedColumnTypes returnType) throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

The TransformFunctionFactory API provides the following methods for extension by subclasses:


def createTransformFunction(self, srv)

def getPrototype(self, srv_interface, arg_types, return_type)

def getReturnType(self, srv_interface, arg_types, return_type)

def getParameterType(self, server_interface, parameterTypes)

Implement the Factory function API to define a transform function factory:

FunctionNameFactory <- function() {
  list(name    = FunctionName,
       udxtype = c("scalar"),
       intype  = c("int"),
       outtype = c("int"))
}

3 - MultiPhaseTransformFunctionFactory class

Multi-phase UDTFs let you break your data processing into multiple steps.

Multi-phase UDTFs let you break your data processing into multiple steps. Using this feature, your UDTFs can perform processing in a way similar to Hadoop or other MapReduce frameworks. You can use the first phase to break down and gather data, and then use subsequent phases to process the data. For example, the first phase of your UDTF could extract specific types of user interactions from a web server log stored in the column of a table, and subsequent phases could perform analysis on those interactions.

Multi-phase UDTFs also let you decide where processing should occur: locally on each node, or throughout the cluster. If your multi-phase UDTF is like a MapReduce process, you want the first phase of your multi-phase UDTF to process data that is stored locally on the node where the instance of the UDTF is running. This prevents large segments of data from being copied around the Vertica cluster. Depending on the type of processing being performed in later phases, you may choose to have the data segmented and distributed across the Vertica cluster.

Each phase of the UDTF is the same as a traditional (single-phase) UDTF: it receives a table as input, and generates a table as output. The schema for each phase's output does not have to match its input, and each phase can output as many or as few rows as it wants.

You create a subclass of TransformFunction to define the processing performed by each stage. If you already have a TransformFunction from a single-phase UDTF that performs the processing you want a phase of your multi-phase UDTF to perform, you can easily adapt it to work within the multi-phase UDTF.

What makes a multi-phase UDTF different from a traditional UDTF is the factory class you use. You define a multi-phase UDTF using a subclass of MultiPhaseTransformFunctionFactory, rather than the TransformFunctionFactory. This special factory class acts as a container for all of the phases in your multi-step UDTF. It provides Vertica with the input and output requirements of the entire multi-phase UDTF (through the getPrototype() function), and a list of all the phases in the UDTF.

Within your subclass of the MultiPhaseTransformFunctionFactory class, you define one or more subclasses of TransformFunctionPhase. These classes fill the same role as the TransformFunctionFactory class for each phase in your multi-phase UDTF. They define the input and output of each phase and create instances of their associated TransformFunction classes to perform the processing for each phase of the UDTF. In addition to these subclasses, your MultiPhaseTransformFunctionFactory includes fields that provide a handle to an instance of each of the TransformFunctionPhase subclasses.

API

The MultiPhaseTransformFunctionFactory class extends TransformFunctionFactory The API provides the following additional methods for extension by subclasses:

virtual void getPhases(ServerInterface &srvInterface,
        std::vector< TransformFunctionPhase * > &phases)=0;

If using this factory you must also extend TransformFunctionPhase. See the SDK reference documentation.

The MultiPhaseTransformFunctionFactory class extends TransformFunctionFactory. The API provides the following methods for extension by subclasses:

public abstract void getPhases(ServerInterface srvInterface,
        Vector< TransformFunctionPhase > phases);

If using this factory you must also extend TransformFunctionPhase. See the SDK reference documentation.

The TransformFunctionFactory class extends TransformFunctionFactory. For each phase, the factory must define a class that extends TransformFunctionPhase.

The factory adds the following method:


def getPhase(cls, srv)

TransformFunctionPhase has the following methods:


def createTransformFunction(cls, srv)

def getReturnType(self, srv_interface, input_types, output_types)

4 - Improving query performance (C++ only)

When evaluating a query, the Vertica optimizer might sort its input to improve performance.

When evaluating a query, the Vertica optimizer might sort its input to improve performance. If a function already returns sorted data, this means the optimizer is doing extra work. A transform function written in C++ can declare how the data it returns is sorted, and the optimizer can take advantage of that information.

A transform function does the actual sorting in the function's processPartition() method. To take advantage of this optimization, sorts must be ascending. You need not sort all columns, but you must return the sorted column or columns first.

You can declare how the function sorts its output in the factory's getReturnType() method.

The PolyTopKPerPartition example sorts input columns and returns a given number of rows:

=> SELECT polykSort(14, a, b, c) OVER (ORDER BY a, b, c)
    AS (sort1,sort2,sort3) FROM observations ORDER BY 1,2,3;
 sort1 | sort2 | sort3
-------+-------+-------
     1 |     1 |     1
     1 |     1 |     2
     1 |     1 |     3
     1 |     2 |     1
     1 |     2 |     2
     1 |     3 |     1
     1 |     3 |     2
     1 |     3 |     3
     1 |     3 |     4
     2 |     1 |     1
     2 |     1 |     2
     2 |     2 |     3
     2 |     2 |    34
     2 |     3 |     5
(14 rows)

The factory declares this sorting in getReturnType() by setting the isSortedBy property on each column. Each SizedColumnType has an associated Properties object where this value can be set:


virtual void getReturnType(ServerInterface &srvInterface, const SizedColumnTypes &inputTypes, SizedColumnTypes &outputTypes)
{
    vector<size_t> argCols; // Argument column indexes.
    inputTypes.getArgumentColumns(argCols);
    size_t colIdx = 0;

    for (vector<size_t>::iterator i = argCols.begin() + 1; i < argCols.end(); i++)
    {
        SizedColumnTypes::Properties props;
        props.isSortedBy = true;
        std::stringstream cname;
        cname << "col" << colIdx++;
        outputTypes.addArg(inputTypes.getColumnType(*i), cname.str(), props);
    }
}

You can see the effects of this optimization by reviewing the EXPLAIN plans for queries with and without this setting. The following output shows the query plan for polyk, the unsorted version. Note the cost for sorting:

=> EXPLAN SELECT polyk(14, a, b, c) OVER (ORDER BY a, b, c)
    FROM observations ORDER BY 1,2,3;

 Access Path:
 +-SORT [Cost: 2K, Rows: 10K] (PATH ID: 1)
 |  Order: col0 ASC, col1 ASC, col2 ASC
 | +---> ANALYTICAL [Cost: 2K, Rows: 10K] (PATH ID: 2)
 | |      Analytic Group
 | |       Functions: polyk()
 | |       Group Sort: observations.a ASC NULLS LAST, observations.b ASC NULLS LAST, observations.c ASC NULLS LAST
 | | +---> STORAGE ACCESS for observations [Cost: 2K, Rows: 10K]
 (PATH ID: 3)
 | | |      Projection: public.observations_super
 | | |      Materialize: observations.a, observations.b, observations.c

The query plan for the sorted version omits this step (and cost) and starts with the analytical step (the second step in the previous plan):

=> EXPLAN SELECT polykSort(14, a, b, c) OVER (ORDER BY a, b, c)
    FROM observations ORDER BY 1,2,3;

Access Path:
 +-ANALYTICAL [Cost: 2K, Rows: 10K] (PATH ID: 2)
 |  Analytic Group
 |   Functions: polykSort()
 |   Group Sort: observations.a ASC NULLS LAST, observations.b ASC NULLS LAST, observations.c ASC NULLS LAST
 | +---> STORAGE ACCESS for observations [Cost: 2K, Rows: 10K] (PATH ID: 3)
 | |      Projection: public.observations_super
 | |      Materialize: observations.a, observations.b, observations.c

5 - Partitioning options for processing local data

UDTFs typically process data that is partitioned in a specific way.

UDTFs typically process data that is partitioned in a specific way. For example, a UDTF that processes a web server log file to count the number of hits referred by each partner web site needs to have its input partitioned by a referrer column. Each instance of the UDTF sees the hits referred by a particular partner site so it can count them.

In cases like this, the window partitioning clause should use a PARTITION BY clause. Each node in the cluster partitions the data it stores, sends some of these partitions off to other nodes, and then consolidates the partitions it receives from other nodes and runs an instance of the UDTF to process them.

In other cases, a UDTF might not need to partition input data in a particular way—for example, a UDTF that parses data out of an Apache log file. In this case, you can specify that each UDTF instance process only the data that is stored locally by the node on which it is running. By eliminating the overhead of partitioning data, processing can be much more efficient.

You can tell a UDTF to process only local data with one of the following window partitioning options:

  • PARTITION BEST: For thread-safe UDTFs only, optimizes performance through multi-threaded queries across multiple nodes.

  • PARTITION NODES: Optimizes performance of single-threaded queries across multiple nodes.

The query must specify a source table that is replicated across all nodes and contains a single row (similar to the DUAL table). For example, the following statements call a UDTF that parses locally-stored Apache log files:

=> CREATE TABLE rep (dummy INTEGER) UNSEGMENTED ALL NODES;
CREATE TABLE
=> INSERT INTO rep VALUES (1);
 OUTPUT
--------
      1
(1 row)
=> SELECT ParseLogFile('/data/apache/log*') OVER (PARTITION BEST) FROM rep;

6 - C++ example: string tokenizer

The following example shows a subclass of TransformFunction named StringTokenizer.

The following example shows a subclass of TransformFunction named StringTokenizer. It defines a UDTF that reads a table containing an INTEGER ID column and a VARCHAR column. It breaks the text in the VARCHAR column into tokens (individual words). It returns a table containing each token, the row it occurred in, and its position within the string.

Loading and using the example

The following example shows how to load the function into Vertica. It assumes that the TransformFunctions.so library that contains the function has been copied to the dbadmin user's home directory on the initiator node.

=> CREATE LIBRARY TransformFunctions AS
   '/home/dbadmin/TransformFunctions.so';
CREATE LIBRARY
=> CREATE TRANSFORM FUNCTION tokenize
   AS LANGUAGE 'C++' NAME 'TokenFactory' LIBRARY TransformFunctions;
CREATE TRANSFORM FUNCTION

You can then use it from SQL statements, for example:


=> CREATE TABLE T (url varchar(30), description varchar(2000));
CREATE TABLE
=> INSERT INTO T VALUES ('www.amazon.com','Online retail merchant and provider of cloud services');
 OUTPUT
--------
      1
(1 row)
=> INSERT INTO T VALUES ('www.vertica.com','World''s fastest analytic database');
 OUTPUT
--------
      1
(1 row)
=> COMMIT;
COMMIT

=> -- Invoke the UDTF
=> SELECT url, tokenize(description) OVER (partition by url) FROM T;
       url       |   words
-----------------+-----------
 www.amazon.com  | Online
 www.amazon.com  | retail
 www.amazon.com  | merchant
 www.amazon.com  | and
 www.amazon.com  | provider
 www.amazon.com  | of
 www.amazon.com  | cloud
 www.amazon.com  | services
 www.vertica.com | World's
 www.vertica.com | fastest
 www.vertica.com | analytic
 www.vertica.com | database
(12 rows)

Notice that the number of rows and columns in the result table are different than the input table. This is one of the strengths of a UDTF.

TransformFunction implementation

The following code shows the StringTokenizer class.

class StringTokenizer : public TransformFunction
{
  virtual void processPartition(ServerInterface &srvInterface,
                                PartitionReader &inputReader,
                                PartitionWriter &outputWriter)
  {
    try {
      if (inputReader.getNumCols() != 1)
        vt_report_error(0, "Function only accepts 1 argument, but %zu provided", inputReader.getNumCols());

      do {
        const VString &sentence = inputReader.getStringRef(0);

        // If input string is NULL, then output is NULL as well
        if (sentence.isNull())
          {
            VString &word = outputWriter.getStringRef(0);
            word.setNull();
            outputWriter.next();
          }
        else
          {
            // Otherwise, let's tokenize the string and output the words
            std::string tmp = sentence.str();
            std::istringstream ss(tmp);

            do
              {
                std::string buffer;
                ss >> buffer;

                // Copy to output
                if (!buffer.empty()) {
                  VString &word = outputWriter.getStringRef(0);
                  word.copy(buffer);
                  outputWriter.next();
                }
              } while (ss);
          }
      } while (inputReader.next() && !isCanceled());
    } catch(std::exception& e) {
      // Standard exception. Quit.
      vt_report_error(0, "Exception while processing partition: [%s]", e.what());
    }
  }
};

The processPartition() function in this example follows a pattern that you will follow in your own UDTF: it loops over all rows in the table partition that Vertica sends it, processing each row and checking for cancellation before advancing. For UDTFs you do not have to actually process every row. You can exit your function without having read all of the input without any issues. You may choose to do this if your UDTF is performing some sort search or some other operation where it can determine that the rest of the input is unneeded.

In this example, processPartition() first extracts the VString containing the text from the PartitionReader object. The VString class represents a Vertica string value (VARCHAR or CHAR). If there is input, it then tokenizes it and adds it to the output using the PartitionWriter object.

Similarly to reading input columns, the PartitionWriter class has functions for writing each type of data to the output row. In this case, the example calls the PartitionWriter object's getStringRef() function to allocate a new VString object to hold the token to output for the first column, and then copies the token's value into the VString.

TranformFunctionFactory implementation

The following code shows the factory class.

class TokenFactory : public TransformFunctionFactory
{
  // Tell Vertica that we take in a row with 1 string, and return a row with 1 string
  virtual void getPrototype(ServerInterface &srvInterface, ColumnTypes &argTypes, ColumnTypes &returnType)
  {
    argTypes.addVarchar();
    returnType.addVarchar();
  }

  // Tell Vertica what our return string length will be, given the input
  // string length
  virtual void getReturnType(ServerInterface &srvInterface,
                             const SizedColumnTypes &inputTypes,
                             SizedColumnTypes &outputTypes)
  {
    // Error out if we're called with anything but 1 argument
    if (inputTypes.getColumnCount() != 1)
      vt_report_error(0, "Function only accepts 1 argument, but %zu provided", inputTypes.getColumnCount());

    int input_len = inputTypes.getColumnType(0).getStringLength();

    // Our output size will never be more than the input size
    outputTypes.addVarchar(input_len, "words");
  }

  virtual TransformFunction *createTransformFunction(ServerInterface &srvInterface)
  { return vt_createFuncObject<StringTokenizer>(srvInterface.allocator); }

};

In this example:

  • The UDTF takes a VARCHAR column as input. To define the input column, getPrototype() calls addVarchar() on the ColumnTypes object that represents the input table.

  • The UDTF returns a VARCHAR as output. The getPrototype() function calls addVarchar() to define the output table.

This example must return the maximum length of the VARCHAR output column. It sets the length to the length of the input string. This is a safe value, because the output will never be longer than the input string. It also sets the name of the VARCHAR output column to "words".

The implementation of the createTransformFunction() function in the example is boilerplate code. It just calls the vt_returnFuncObj macro with the name of the TransformFunction class associated with this factory class. This macro takes care of instantiating a copy of the TransformFunction class that Vertica can use to process data.

The RegisterFactory macro

The final step in creating your UDTF is to call the RegisterFactory macro. This macro ensures that your factory class is instantiated when Vertica loads the shared library containing your UDTF. Having your factory class instantiated is the only way that Vertica can find your UDTF and determine what its inputs and outputs are.

The RegisterFactory macro just takes the name of your factory class:

RegisterFactory(TokenFactory);

7 - Python example: string tokenizer

The following example shows a transform function that breaks an input string into tokens (based on whitespace).

The following example shows a transform function that breaks an input string into tokens (based on whitespace). It is similar to the tokenizer examples for C++ and Java.

Loading and using the example

Create the library and function:

=> CREATE LIBRARY pyudtf AS '/home/dbadmin/udx/tokenize.py' LANGUAGE 'Python';
CREATE LIBRARY
=> CREATE TRANSFORM FUNCTION tokenize AS NAME 'StringTokenizerFactory' LIBRARY pyudtf;
CREATE TRANSFORM FUNCTION

You can then use the function in SQL statements, for example:

=> CREATE TABLE words (w VARCHAR);
CREATE TABLE
=> COPY words FROM STDIN;
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> this is a test of the python udtf
>> \.

=> SELECT tokenize(w) OVER () FROM words;
  token
----------
 this
 is
 a
 test
 of
 the
 python
 udtf
(8 rows)

Setup

All Python UDxs must import the Vertica SDK.

import vertica_sdk

UDTF Python code

The following code defines the tokenizer and its factory.

class StringTokenizer(vertica_sdk.TransformFunction):
    """
    Transform function which tokenizes its inputs.
    For each input string, each of the whitespace-separated tokens of that
    string is produced as output.
    """
    def processPartition(self, server_interface, input, output):
        while True:
            for token in input.getString(0).split():
                output.setString(0, token)
                output.next()
            if not input.next():
                break


class StringTokenizerFactory(vertica_sdk.TransformFunctionFactory):
    def getPrototype(self, server_interface, arg_types, return_type):
        arg_types.addVarchar()
        return_type.addVarchar()
    def getReturnType(self, server_interface, arg_types, return_type):
        return_type.addColumn(arg_types.getColumnType(0), "tokens")
    def createTransformFunction(cls, server_interface):
        return StringTokenizer()

8 - R example: log tokenizer

The LogTokenizer transform function reads a varchar from a table, a log message.

The LogTokenizer transform function reads a varchar from a table, a log message. It then tokenizes each of the log messages, returning each of the tokens.

You can find more UDx examples in the Vertica Github repository, https://github.com/vertica/UDx-Examples.

Load the function and library

Create the library and the function.

=> CREATE OR REPLACE LIBRARY rLib AS 'log_tokenizer.R' LANGUAGE 'R';
CREATE LIBRARY
=> CREATE OR REPLACE TRANSFORM FUNCTION LogTokenizer AS LANGUAGE 'R' NAME 'LogTokenizerFactory' LIBRARY rLib FENCED;
CREATE FUNCTION

Querying data with the function

The following query shows how you can run a query with the UDTF.

=> SELECT machine,
          LogTokenizer(error_log USING PARAMETERS spliton = ' ') OVER(PARTITION BY machine)
     FROM error_logs;
 machine |  Token
---------+---------
 node001 | ERROR
 node001 | 345
 node001 | -
 node001 | Broken
 node001 | pipe
 node001 | WARN
 node001 | -
 node001 | Nearly
 node001 | filled
 node001 | disk
 node002 | ERROR
 node002 | 111
 node002 | -
 node002 | Flooded
 node002 | roads
 node003 | ERROR
 node003 | 222
 node003 | -
 node003 | Plain
 node003 | old
 node003 | broken
(21 rows)

UDTF R code


LogTokenizer <- function(input.data.frame, parameters.data.frame) {
  # Take the spliton parameter passed by the user and assign it to a variable
  # in the function so we can use that as our tokenizer.
  if ( is.null(parameters.data.frame[['spliton']]) ) {
    stop("NULL value for spliton! Token cannot be NULL.")
  } else {
    split.on <- as.character(parameters.data.frame[['spliton']])
  }
  # Tokenize the string.
  tokens <- vector(length=0)
  for ( string in input.data.frame[, 1] ) {
    tokenized.string <- strsplit(string, split.on)
    for ( token in tokenized.string ) {
      tokens <- append(tokens, token)
    }
  }
  final.output <- data.frame(tokens)
  return(final.output)
}

LogTokenizerFactory <- function() {
  list(name    = LogTokenizer,
       udxtype = c("transform"),
       intype  = c("varchar"),
       outtype = c("varchar"),
       outtypecallback=LogTokenizerReturn,
       parametertypecallback=LogTokenizerParameters)
}


LogTokenizerParameters <- function() {
  parameters <- list(datatype = c("varchar"),
                     length   = c("NA"),
                     scale    = c("NA"),
                     name     = c("spliton"))
  return(parameters)
}

LogTokenizerReturn <- function(arg.data.frame, parm.data.frame) {
  output.return.type <- data.frame(datatype = rep(NA,1),
                                   length   = rep(NA,1),
                                   scale    = rep(NA,1),
                                   name     = rep(NA,1))
  output.return.type$datatype <- c("varchar")
  output.return.type$name <- c("Token")
  return(output.return.type)
}

9 - C++ example: multi-phase indexer

The following code fragment is from the InvertedIndex UDTF example distributed with the Vertica SDK.

The following code fragment is from the InvertedIndex UDTF example distributed with the Vertica SDK. It demonstrates subclassing the MultiPhaseTransformFunctionFactory including two TransformFunctionPhase subclasses that define the two phases in this UDTF.

class InvertedIndexFactory : public MultiPhaseTransformFunctionFactory
{
public:
   /**
    * Extracts terms from documents.
    */
   class ForwardIndexPhase : public TransformFunctionPhase
   {
       virtual void getReturnType(ServerInterface &srvInterface,
                                  const SizedColumnTypes &inputTypes,
                                  SizedColumnTypes &outputTypes)
       {
           // Sanity checks on input we've been given.
           // Expected input: (doc_id INTEGER, text VARCHAR)
           vector<size_t> argCols;
           inputTypes.getArgumentColumns(argCols);
           if (argCols.size() < 2 ||
               !inputTypes.getColumnType(argCols.at(0)).isInt() ||
               !inputTypes.getColumnType(argCols.at(1)).isVarchar())
               vt_report_error(0, "Function only accepts two arguments"
                                "(INTEGER, VARCHAR))");
           // Output of this phase is:
           //   (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)
           // Number of times term appears within a document.
           outputTypes.addInt("term_freq");
           // Add analytic clause columns: (PARTITION BY term ORDER BY doc_id).
           // The length of any term is at most the size of the entire document.
           outputTypes.addVarcharPartitionColumn(
                inputTypes.getColumnType(argCols.at(1)).getStringLength(),
                "term");
           // Add order column on the basis of the document id's data type.
           outputTypes.addOrderColumn(inputTypes.getColumnType(argCols.at(0)),
                                      "doc_id");
       }
       virtual TransformFunction *createTransformFunction(ServerInterface
                &srvInterface)
       { return vt_createFuncObj(srvInterface.allocator, ForwardIndexBuilder); }
   };
   /**
    * Constructs terms' posting lists.
    */
   class InvertedIndexPhase : public TransformFunctionPhase
   {
       virtual void getReturnType(ServerInterface &srvInterface,
                                  const SizedColumnTypes &inputTypes,
                                  SizedColumnTypes &outputTypes)
       {
           // Sanity checks on input we've been given.
           // Expected input:
           //   (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)
           vector<size_t> argCols;
           inputTypes.getArgumentColumns(argCols);
           vector<size_t> pByCols;
           inputTypes.getPartitionByColumns(pByCols);
           vector<size_t> oByCols;
           inputTypes.getOrderByColumns(oByCols);
           if (argCols.size() != 1 || pByCols.size() != 1 || oByCols.size() != 1 ||
               !inputTypes.getColumnType(argCols.at(0)).isInt() ||
               !inputTypes.getColumnType(pByCols.at(0)).isVarchar() ||
               !inputTypes.getColumnType(oByCols.at(0)).isInt())
               vt_report_error(0, "Function expects an argument (INTEGER) with "
                               "analytic clause OVER(PBY VARCHAR OBY INTEGER)");
           // Output of this phase is:
           //   (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER).
           outputTypes.addVarchar(inputTypes.getColumnType(
                                    pByCols.at(0)).getStringLength(),"term");
           outputTypes.addInt("doc_id");
           // Number of times term appears within the document.
           outputTypes.addInt("term_freq");
           // Number of documents where the term appears in.
           outputTypes.addInt("corp_freq");
       }

       virtual TransformFunction *createTransformFunction(ServerInterface
                &srvInterface)
       { return vt_createFuncObj(srvInterface.allocator, InvertedIndexBuilder); }
   };
   ForwardIndexPhase fwardIdxPh;
   InvertedIndexPhase invIdxPh;
   virtual void getPhases(ServerInterface &srvInterface,
        std::vector<TransformFunctionPhase *> &phases)
   {
       fwardIdxPh.setPrepass(); // Process documents wherever they're originally stored.
       phases.push_back(&fwardIdxPh);
       phases.push_back(&invIdxPh);
   }
   virtual void getPrototype(ServerInterface &srvInterface,
                             ColumnTypes &argTypes,
                             ColumnTypes &returnType)
   {
       // Expected input: (doc_id INTEGER, text VARCHAR).
       argTypes.addInt();
       argTypes.addVarchar();
       // Output is: (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER)
       returnType.addVarchar();
       returnType.addInt();
       returnType.addInt();
       returnType.addInt();
   }
};
RegisterFactory(InvertedIndexFactory);

Most of the code in this example is similar to the code in a TransformFunctionFactory class:

  • Both TransformFunctionPhase subclasses implement the getReturnType() function, which describes the output of each stage. This is the similar to the getReturnType() function from the TransformFunctionFactory class. However, this function also lets you control how the data is partitioned and ordered between each phase of your multi-phase UDTF.

    The first phase calls SizedColumnTypes::addVarcharPartitionColumn() (rather than just addVarcharColumn()) to set the phase's output table to be partitioned by the column containing the extracted words. It also calls SizedColumnTypes::addOrderColumn() to order the output table by the document ID column. It calls this function instead of one of the data-type-specific functions (such as addIntOrderColumn()) so it can pass the data type of the original column through to the output column.

  • The MultiPhaseTransformFunctionFactory class implements the getPrototype() function, that defines the schemas for the input and output of the multi-phase UDTF. This function is the same as the TransformFunctionFactory::getPrototype() function.

The unique function implemented by the MultiPhaseTransformFunctionFactory class is getPhases(). This function defines the order in which the phases are executed. The fields that represent the phases are pushed into this vector in the order they should execute.

The MultiPhaseTransformFunctionFactory.getPhases() function is also where you flag the first phase of the UDTF as operating on data stored locally on the node (called a "pre-pass" phase) rather than on data partitioned across all nodes. Using this option increases the efficiency of your multi-phase UDTF by avoiding having to move significant amounts of data around the Vertica cluster.

To mark the first phase as pre-pass, you call the TransformFunctionPhase::setPrepass() function of the first phase's TransformFunctionPhase instance from within the getPhase() function.

Notes

  • You need to ensure that the output schema of each phase matches the input schema expected by the next phase. In the example code, each TransformFunctionPhase::getReturnType() implementation performs a sanity check on its input and output schemas. Your TransformFunction subclasses can also perform these checks in their processPartition() function.

  • There is no built-in limit on the number of phases that your multi-phase UDTF can have. However, more phases use more resources. When running in fenced mode, Vertica may terminate UDTFs that use too much memory. See Resource use for C++ UDxs.

10 - Python example: multi-phase calculation

The following example shows a multi-phase transform function that computes the average value on a column of numbers in an input table.

The following example shows a multi-phase transform function that computes the average value on a column of numbers in an input table. It first defines two transform functions, and then defines a factory that creates the phases using them.

See AvgMultiPhaseUDT.py in the examples distribution for the complete code.

Loading and using the example

Create the library and function:

=> CREATE LIBRARY pylib_avg AS '/home/dbadmin/udx/AvgMultiPhaseUDT.py' LANGUAGE 'Python';
CREATE LIBRARY
=> CREATE TRANSFORM FUNCTION myAvg AS NAME 'MyAvgFactory' LIBRARY pylib_avg;
CREATE TRANSFORM FUNCTION

You can then use the function in SELECT statements:

=> CREATE TABLE IF NOT EXISTS numbers(num FLOAT);
CREATE TABLE

=> COPY numbers FROM STDIN delimiter ',';
1
2
3
4
\.

=> SELECT myAvg(num) OVER() FROM numbers;
 average | ignored_rows | total_rows
---------+--------------+------------
     2.5 |            0 |          4
(1 row)

Setup

All Python UDxs must import the Vertica SDK. This example also imports another library.

import vertica_sdk
import math

Component transform functions

A multi-phase transform function must define two or more TransformFunction subclasses to be used in the phases. This example uses two classes: LocalCalculation, which does calculations on local partitions, and GlobalCalculation, which aggregates the results of all LocalCalculation instances to calculate a final result.

In both functions, the calculation is done in the processPartition() function:

class LocalCalculation(vertica_sdk.TransformFunction):
    """
    This class is the first phase and calculates the local values for sum, ignored_rows and total_rows.
    """

    def setup(self, server_interface, col_types):
        server_interface.log("Setup: Phase0")
        self.local_sum = 0.0
        self.ignored_rows = 0
        self.total_rows = 0

    def processPartition(self, server_interface, input, output):
        server_interface.log("Process Partition: Phase0")

        while True:
            self.total_rows += 1

            if input.isNull(0) or math.isinf(input.getFloat(0)) or math.isnan(input.getFloat(0)):
                # Null, Inf, or Nan is ignored
                self.ignored_rows += 1
            else:
                self.local_sum += input.getFloat(0)

            if not input.next():
                break

        output.setFloat(0, self.local_sum)
        output.setInt(1, self.ignored_rows)
        output.setInt(2, self.total_rows)
        output.next()

class GlobalCalculation(vertica_sdk.TransformFunction):
    """
    This class is the second phase and aggregates the values for sum, ignored_rows and total_rows.
    """

    def setup(self, server_interface, col_types):
        server_interface.log("Setup: Phase1")
        self.global_sum = 0.0
        self.ignored_rows = 0
        self.total_rows = 0

    def processPartition(self, server_interface, input, output):
        server_interface.log("Process Partition: Phase1")

        while True:
            self.global_sum += input.getFloat(0)
            self.ignored_rows += input.getInt(1)
            self.total_rows += input.getInt(2)

            if not input.next():
                break

        average = self.global_sum / (self.total_rows - self.ignored_rows)

        output.setFloat(0, average)
        output.setInt(1, self.ignored_rows)
        output.setInt(2, self.total_rows)
        output.next()

Multi-phase factory

A MultiPhaseTransformFunctionFactory ties together the individual functions as phases. The factory defines a TransformFunctionPhase for each function. Each phase defines createTransformFunction(), which calls the constructor for the corresponding TransformFunction, and getReturnType().

The first phase, LocalPhase, follows.


class MyAvgFactory(vertica_sdk.MultiPhaseTransformFunctionFactory):
    """ Factory class """

    class LocalPhase(vertica_sdk.TransformFunctionPhase):
        """ Phase 1 """
        def getReturnType(self, server_interface, input_types, output_types):
            # sanity check
            number_of_cols = input_types.getColumnCount()
            if (number_of_cols != 1 or not input_types.getColumnType(0).isFloat()):
                raise ValueError("Function only accepts one argument (FLOAT))")

            output_types.addFloat("local_sum");
            output_types.addInt("ignored_rows");
            output_types.addInt("total_rows");

        def createTransformFunction(cls, server_interface):
            return LocalCalculation()

The second phase, GlobalPhase, does not check its inputs because the first phase already did. As with the first phase, createTransformFunction merely constructs and returns the corresponding TransformFunction.


    class GlobalPhase(vertica_sdk.TransformFunctionPhase):
        """ Phase 2 """
        def getReturnType(self, server_interface, input_types, output_types):
            output_types.addFloat("average");
            output_types.addInt("ignored_rows");
            output_types.addInt("total_rows");

        def createTransformFunction(cls, server_interface):
            return GlobalCalculation()

After defining the TransformFunctionPhase subclasses, the factory instantiates them and chains them together in getPhases().

    ph0Instance = LocalPhase()
    ph1Instance = GlobalPhase()

    def getPhases(cls, server_interface):
        cls.ph0Instance.setPrepass()
        phases = [cls.ph0Instance, cls.ph1Instance]
        return phases