A user-defined transform function (UDTF) lets you transform a table of data into another table. It reads one or more arguments (treated as a row of data), and returns zero or more rows of data consisting of one or more columns. A UDTF can produce any number of rows as output. However, each row it outputs must be complete. Advancing to the next row without having added a value for each column produces incorrect results.
The schema of the output table does not need to correspond to the schema of the input table—they can be totally different. The UDTF can return any number of output rows for each row of input.
Unless a UDTF is marked as one-to-many in its factory function, it can only be used in a SELECT list that contains the UDTF call and a required OVER clause. A multi-phase UDTF can make use of partition columns (PARTITION BY), but other UDTFs cannot.
UDTFs are run after GROUP BY, but before the final ORDER BY, when used in conjunction with GROUP BY and ORDER BY in a statement. The ORDER BY clause may contain only columns or expressions that are in a window partition clause (see Window partitioning).
UDTFs can take up to 9800 parameters (input columns). Attempts to pass more parameters to a UDTF return an error.
1 - TransformFunction class
The TransformFunction class is where you perform the data-processing, transforming input rows into output rows.
The TransformFunction class is where you perform the data-processing, transforming input rows into output rows. Your subclass must define the processPartition() method. It may define methods to set up and tear down the function.
Performing the transformation
The processPartition() method carries out all of the processing that you want your UDTF to perform. When a user calls your function in a SQL statement, Vertica bundles together the data from the function parameters and passes it to processPartition().
The input and output of the processPartition() method are supplied by objects of the PartitionReader and PartitionWriter classes. They define methods that you use to read the input data and write the output data for your UDTF.
A UDTF does not necessarily operate on a single row the way a UDSF does. A UDTF can read any number of rows and write output at any time.
Consider the following guidelines when implementing processPartition():
Extract the input parameters by calling data-type-specific functions in the PartitionReader object to extract each input parameter. Each of these functions takes a single parameter: the column number in the input row that you want to read. Your function might need to handle NULL values.
When writing output, your UDTF must supply values for all of the output columns you defined in your factory. Similarly to reading input columns, the PartitionWriter object has functions for writing each type of data to the output row.
Use PartitionReader.next() to determine if there is more input to process, and exit when the input is exhausted.
In some cases, you might want to determine the number and types of parameters using PartitionReader's getNumCols() and getTypeMetaData() functions, instead of just hard-coding the data types of the columns in the input row. This is useful if you want your TransformFunction to be able to process input tables with different schemas. You can then use different TransformFunctionFactory classes to define multiple function signatures that call the same TransformFunction class. See Overloading your UDx for more information.
Setting up and tearing down
The TransformFunction class defines two additional methods that you can optionally implement to allocate and free resources: setup() and destroy(). You should use these methods to allocate and deallocate resources that you do not allocate through the UDx API (see Allocating resources for UDxs for details).
The PartitionReader and PartitionWriter classes provide getters and setters for column values, along with next() to iterate through partitions. See the API reference documentation for details.
The TransformFunction API provides the following methods for extension by subclasses:
The PartitionReader and PartitionWriter classes provide getters and setters for column values, along with next() to iterate through partitions. See the API reference documentation for details.
The TransformFunction API provides the following methods for extension by subclasses:
The PartitionReader and PartitionWriter classes provide getters and setters for column values, along with next() to iterate through partitions. See the API reference documentation for details.
Implement the Main function API to define a transform function:
FunctionName<-function(input.data.frame,parameters.data.frame){# Computations# The function must return a data frame.return(output.data.frame)}
2 - TransformFunctionFactory class
The TransformFunctionFactory class tells Vertica metadata about your UDTF: its number of parameters and their data types, as well as function properties and the data type of the return value.
The TransformFunctionFactory class tells Vertica metadata about your UDTF: its number of parameters and their data types, as well as function properties and the data type of the return value. It also instantiates a subclass of TransformFunction.
You must implement the following methods in your TransformFunctionFactory:
getPrototype() returns two ColumnTypes objects that describe the columns your UDTF takes as input and returns as output.
getReturnType() tells Vertica details about the output values: the width of variable-sized data types (such as VARCHAR) and the precision of data types that have settable precision (such as TIMESTAMP). You can also set the names of the output columns using this function. While this method is optional for UDxs that return single values, you must implement it for UDTFs.
createTransformFunction() instantiates your TransformFunction subclass.
For UDTFs written in C++ and Python, you can implement the getTransformFunctionProperties() method to set transform function class properties, including:
isExploder: By default False, indicates whether a single-phase UDTF performs a transform from one input row to a result set of N rows, often called a one-to-many transform. If set to True, each partition to the UDTF must consist of exactly one input row. When a UDTF is labeled as one-to-many, Vertica is able to optimize query plans and users can write SELECT queries that include any expression and do not require an OVER clause. For more information about UDTF partitioning options and instructions on how to set this class property, see Partitioning options for UDTFs. See Python example: explode for an in-depth example detailing a one-to-many UDTF.
Multi-phase UDTFs let you break your data processing into multiple steps.
Multi-phase UDTFs let you break your data processing into multiple steps. Using this feature, your UDTFs can perform processing in a way similar to Hadoop or other MapReduce frameworks. You can use the first phase to break down and gather data, and then use subsequent phases to process the data. For example, the first phase of your UDTF could extract specific types of user interactions from a web server log stored in the column of a table, and subsequent phases could perform analysis on those interactions.
Multi-phase UDTFs also let you decide where processing should occur: locally on each node, or throughout the cluster. If your multi-phase UDTF is like a MapReduce process, you want the first phase of your multi-phase UDTF to process data that is stored locally on the node where the instance of the UDTF is running. This prevents large segments of data from being copied around the Vertica cluster. Depending on the type of processing being performed in later phases, you may choose to have the data segmented and distributed across the Vertica cluster.
Each phase of the UDTF is the same as a traditional (single-phase) UDTF: it receives a table as input, and generates a table as output. The schema for each phase's output does not have to match its input, and each phase can output as many or as few rows as it wants.
You create a subclass of TransformFunction to define the processing performed by each stage. If you already have a TransformFunction from a single-phase UDTF that performs the processing you want a phase of your multi-phase UDTF to perform, you can easily adapt it to work within the multi-phase UDTF.
What makes a multi-phase UDTF different from a traditional UDTF is the factory class you use. You define a multi-phase UDTF using a subclass of MultiPhaseTransformFunctionFactory, rather than the TransformFunctionFactory. This special factory class acts as a container for all of the phases in your multi-step UDTF. It provides Vertica with the input and output requirements of the entire multi-phase UDTF (through the getPrototype() function), and a list of all the phases in the UDTF.
Within your subclass of the MultiPhaseTransformFunctionFactory class, you define one or more subclasses of TransformFunctionPhase. These classes fill the same role as the TransformFunctionFactory class for each phase in your multi-phase UDTF. They define the input and output of each phase and create instances of their associated TransformFunction classes to perform the processing for each phase of the UDTF. In addition to these subclasses, your MultiPhaseTransformFunctionFactory includes fields that provide a handle to an instance of each of the TransformFunctionPhase subclasses.
The MultiPhaseTransformFunctionFactory and TransformFunctionPhase classes do not support the isExploder class property.
The MultiPhaseTransformFunctionFactory class extends TransformFunctionFactory The API provides the following additional methods for extension by subclasses:
If using this factory you must also extend TransformFunctionPhase. See the SDK reference documentation.
The TransformFunctionFactory class extends TransformFunctionFactory. For each phase, the factory must define a class that extends TransformFunctionPhase.
When evaluating a query, the Vertica optimizer might sort its input to improve performance.
When evaluating a query, the Vertica optimizer might sort its input to improve performance. If a function already returns sorted data, this means the optimizer is doing extra work. A transform function written in C++ can declare how the data it returns is sorted, and the optimizer can take advantage of that information.
A transform function does the actual sorting in the function's processPartition() method. To take advantage of this optimization, sorts must be ascending. You need not sort all columns, but you must return the sorted column or columns first.
You can declare how the function sorts its output in the factory's getReturnType() method.
If the sorting declared in the factory does not match the sorting provided by the function, query results can be incorrect.
The PolyTopKPerPartition example sorts input columns and returns a given number of rows:
=> SELECT polykSort(14, a, b, c) OVER (ORDER BY a, b, c)
AS (sort1,sort2,sort3) FROM observations ORDER BY 1,2,3;
sort1 | sort2 | sort3
1 | 1 | 1
1 | 1 | 2
1 | 1 | 3
1 | 2 | 1
1 | 2 | 2
1 | 3 | 1
1 | 3 | 2
1 | 3 | 3
1 | 3 | 4
2 | 1 | 1
2 | 1 | 2
2 | 2 | 3
2 | 2 | 34
2 | 3 | 5
(14 rows)
The factory declares this sorting in getReturnType() by setting the isSortedBy property on each column. Each SizedColumnType has an associated Properties object where this value can be set:
You can see the effects of this optimization by reviewing the EXPLAIN plans for queries with and without this setting. The following output shows the query plan for polyk, the unsorted version. Note the cost for sorting:
Depending on the application, a UDTF might require the input data to be partitioned in a specific way.
Depending on the application, a UDTF might require the input data to be partitioned in a specific way. For example, a UDTF that processes a web server log file to count the number of hits referred by each partner web site needs to have its input partitioned by a referrer column. However, in other cases—such as a string tokenizer—the sort order of the data does not matter. Vertica provides partition options for both of these types of UDTFs.
Data sort required
In cases where a specific sort order is required, the window partitioning clause in the query that calls the UDTF should use a PARTITION BY clause. Each node in the cluster partitions the data it stores, sends some of these partitions off to other nodes, and then consolidates the partitions it receives from other nodes and runs an instance of the UDTF to process them.
For example, the following UDTF partitions the input data by store ID and then computes the count of each distinct array element in each partition:
Some UDTFs, such as Explode, do not need to partition input data in a particular way. In these cases, you can specify that each UDTF instance process only the data that is stored locally by the node on which it is running. By eliminating the overhead of partitioning data and the cost of sort and merge operations, processing can be much more efficient.
You can use the following window partition options for UDTFs that do not require a specific data partitioning:
PARTITION ROW: For single-phase UDTFs where each partition is one input row, allows users to write SELECT queries that include any expression. The UDTF calls the processPartition() method once per input row. UDTFs of this type, often called one-to-many transforms, can be explicitly marked as such with the exploder class property in the TransformFunctionFactory class. This class property helps Vertica optimize query plans and removes the need for an OVER clause. See One to Many UDTFs for details on how to set this class property for UDTFs written in C++ and Python.
PARTITION BEST: For thread-safe UDTFs only, optimizes performance through multi-threaded queries across multiple nodes. The UDTF calls the processPartition() method once per thread per node.
PARTITION NODES: Optimizes performance of single-threaded queries across multiple nodes. The UDTF calls the processPartition() method once per node.
For more information about these partition options, see Window partitioning.
To mark a UDTF as one-to-many, you must set the isExploder class property to True within the getTransformFunctionProperties() method. Whether a UDTF is marked as one-to-many can be determined by the transform function's arguments and parameters, for example:
To mark a UDTF as one-to-many, you must set the is_exploder class property to True within the getTransformFunctionProperties() method. Whether a UDTF is marked as one-to-many can be determined by the transform function's arguments and parameters, for example:
If the exploder class property is set to True, the OVER clause is by default OVER(PARTITION ROW). This allows users to call the UDTF without specifying an OVER clause:
=> SELECT * FROM reviews;
id | sentence
1 | Customer service was slow
2 | Product is exactly what I needed
3 | Price is a bit high
4 | Highly recommended
(4 rows)
=> SELECT tokenize(sentence) FROM reviews;
(17 rows)
If a user writes an OVER clause with PARTITION BY for a function marked as one-to-many, the function replaces the OVER clause with OVER(PARTITION ROW) and emits a notice to the user.
One-to-many UDTFs also support any expression in the SELECT clause, unlike UDTFs that use either the PARTITION BEST or the PARTITION NODES clause:
=> SELECT id, tokenize(sentence) FROM reviews;
id | tokens
1 | Customer
1 | service
1 | was
1 | respond
2 | Product
3 | high
4 | Highly
4 | recommended
(17 rows)
The following example shows a subclass of TransformFunction named StringTokenizer.
The following example shows a subclass of TransformFunction named StringTokenizer. It defines a UDTF that reads a table containing an INTEGER ID column and a VARCHAR column. It breaks the text in the VARCHAR column into tokens (individual words). It returns a table containing each token, the row it occurred in, and its position within the string.
Loading and using the example
The following example shows how to load the function into Vertica. It assumes that the TransformFunctions.so library that contains the function has been copied to the dbadmin user's home directory on the initiator node.
=> CREATE LIBRARY TransformFunctions AS
AS LANGUAGE 'C++' NAME 'TokenFactory' LIBRARY TransformFunctions;
You can then use it from SQL statements, for example:
=> CREATE TABLE T (url varchar(30), description varchar(2000));
=> INSERT INTO T VALUES ('www.amazon.com','Online retail merchant and provider of cloud services');
(1 row)
=> INSERT INTO T VALUES ('www.vertica.com','World''s fastest analytic database');
(1 row)
=> -- Invoke the UDTF
=> SELECT url, tokenize(description) OVER (partition by url) FROM T;
url | words
www.amazon.com | Online
www.amazon.com | retail
www.amazon.com | merchant
www.amazon.com | and
www.amazon.com | provider
www.amazon.com | of
www.amazon.com | cloud
www.amazon.com | services
www.vertica.com | World's
www.vertica.com | fastest
www.vertica.com | analytic
www.vertica.com | database
(12 rows)
Notice that the number of rows and columns in the result table are different than the input table. This is one of the strengths of a UDTF.
TransformFunction implementation
The following code shows the StringTokenizer class.
classStringTokenizer:publicTransformFunction{virtualvoidprocessPartition(ServerInterface&srvInterface,PartitionReader&inputReader,PartitionWriter&outputWriter){try{if(inputReader.getNumCols()!=1)vt_report_error(0,"Function only accepts 1 argument, but %zu provided",inputReader.getNumCols());do{constVString&sentence=inputReader.getStringRef(0);// If input string is NULL, then output is NULL as wellif(sentence.isNull()){VString&word=outputWriter.getStringRef(0);word.setNull();outputWriter.next();}else{// Otherwise, let's tokenize the string and output the wordsstd::stringtmp=sentence.str();std::istringstreamss(tmp);do{std::stringbuffer;ss>>buffer;// Copy to outputif(!buffer.empty()){VString&word=outputWriter.getStringRef(0);word.copy(buffer);outputWriter.next();}}while(ss);}}while(inputReader.next()&&!isCanceled());}catch(std::exception&e){// Standard exception. Quit.vt_report_error(0,"Exception while processing partition: [%s]",e.what());}}};
The processPartition() function in this example follows a pattern that you will follow in your own UDTF: it loops over all rows in the table partition that Vertica sends it, processing each row and checking for cancellation before advancing. For UDTFs you do not have to actually process every row. You can exit your function without having read all of the input without any issues. You may choose to do this if your UDTF is performing some sort search or some other operation where it can determine that the rest of the input is unneeded.
In this example, processPartition() first extracts the VString containing the text from the PartitionReader object. The VString class represents a Vertica string value (VARCHAR or CHAR). If there is input, it then tokenizes it and adds it to the output using the PartitionWriter object.
Similarly to reading input columns, the PartitionWriter class has functions for writing each type of data to the output row. In this case, the example calls the PartitionWriter object's getStringRef() function to allocate a new VString object to hold the token to output for the first column, and then copies the token's value into the VString.
TranformFunctionFactory implementation
The following code shows the factory class.
classTokenFactory:publicTransformFunctionFactory{// Tell Vertica that we take in a row with 1 string, and return a row with 1 stringvirtualvoidgetPrototype(ServerInterface&srvInterface,ColumnTypes&argTypes,ColumnTypes&returnType){argTypes.addVarchar();returnType.addVarchar();}// Tell Vertica what our return string length will be, given the input// string lengthvirtualvoidgetReturnType(ServerInterface&srvInterface,constSizedColumnTypes&inputTypes,SizedColumnTypes&outputTypes){// Error out if we're called with anything but 1 argumentif(inputTypes.getColumnCount()!=1)vt_report_error(0,"Function only accepts 1 argument, but %zu provided",inputTypes.getColumnCount());intinput_len=inputTypes.getColumnType(0).getStringLength();// Our output size will never be more than the input sizeoutputTypes.addVarchar(input_len,"words");}virtualTransformFunction*createTransformFunction(ServerInterface&srvInterface){returnvt_createFuncObject<StringTokenizer>(srvInterface.allocator);}};
In this example:
The UDTF takes a VARCHAR column as input. To define the input column, getPrototype() calls addVarchar() on the ColumnTypes object that represents the input table.
The UDTF returns a VARCHAR as output. The getPrototype() function calls addVarchar() to define the output table.
This example must return the maximum length of the VARCHAR output column. It sets the length to the length of the input string. This is a safe value, because the output will never be longer than the input string. It also sets the name of the VARCHAR output column to "words".
You are not required to supply a name for an output column in this function. However, it is a best practice to do so. If you do not name an output column, getReturnType() sets the column name to "". The SQL statements that call your UDTF must provide aliases for any unnamed columns to access them or else they return an error. From a usability standpoint, it is easier for you to supply the column names here once. The alternative is to force all of the users of your function to supply their own column names for each call to the UDTF.
The implementation of the createTransformFunction() function in the example is boilerplate code. It just calls the vt_returnFuncObj macro with the name of the TransformFunction class associated with this factory class. This macro takes care of instantiating a copy of the TransformFunction class that Vertica can use to process data.
The RegisterFactory macro
The final step in creating your UDTF is to call the RegisterFactory macro. This macro ensures that your factory class is instantiated when Vertica loads the shared library containing your UDTF. Having your factory class instantiated is the only way that Vertica can find your UDTF and determine what its inputs and outputs are.
The RegisterFactory macro just takes the name of your factory class:
7 - Python example: string tokenizer
The following example shows a transform function that breaks an input string into tokens (based on whitespace).
The following example shows a transform function that breaks an input string into tokens (based on whitespace). It is similar to the tokenizer examples for C++ and Java.
Loading and using the example
Create the library and function:
=> CREATE LIBRARY pyudtf AS '/home/dbadmin/udx/tokenize.py' LANGUAGE 'Python';
=> CREATE TRANSFORM FUNCTION tokenize AS NAME 'StringTokenizerFactory' LIBRARY pyudtf;
You can then use the function in SQL statements, for example:
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> this is a test of the python udtf
>> \.
=> SELECT tokenize(w) OVER () FROM words;
(8 rows)
All Python UDxs must import the Vertica SDK.
UDTF Python code
The following code defines the tokenizer and its factory.
Transform function which tokenizes its inputs.
For each input string, each of the whitespace-separated tokens of that
string is produced as output.
8 - R example: log tokenizer
The LogTokenizer transform function reads a varchar from a table, a log message.
The LogTokenizer transform function reads a varchar from a table, a log message. It then tokenizes each of the log messages, returning each of the tokens.
Querying data with the function
The following query shows how you can run a query with the UDTF.
LogTokenizer<-function(input.data.frame,parameters.data.frame){# Take the spliton parameter passed by the user and assign it to a variable# in the function so we can use that as our tokenizer.if(is.null(parameters.data.frame[['spliton']])){stop("NULL value for spliton! Token cannot be NULL.")}else{split.on<-as.character(parameters.data.frame[['spliton']])}# Tokenize the string.tokens<-vector(length=0)for(stringininput.data.frame[,1]){tokenized.string<-strsplit(string,split.on)for(tokenintokenized.string){tokens<-append(tokens,token)}}final.output<-data.frame(tokens)return(final.output)}LogTokenizerFactory<-function(){list(name=LogTokenizer,udxtype=c("transform"),intype=c("varchar"),outtype=c("varchar"),outtypecallback=LogTokenizerReturn,parametertypecallback=LogTokenizerParameters)}LogTokenizerParameters<-function(){parameters<-list(datatype=c("varchar"),length=c("NA"),scale=c("NA"),name=c("spliton"))return(parameters)}LogTokenizerReturn<-function(arg.data.frame,parm.data.frame){output.return.type<-data.frame(datatype=rep(NA,1),length=rep(NA,1),scale=rep(NA,1),name=rep(NA,1))output.return.type$datatype<-c("varchar")output.return.type$name<-c("Token")return(output.return.type)}
9 - C++ example: multi-phase indexer
The following code fragment is from the InvertedIndex UDTF example distributed with the Vertica SDK.
The following code fragment is from the InvertedIndex UDTF example distributed with the Vertica SDK. It demonstrates subclassing the MultiPhaseTransformFunctionFactory including two TransformFunctionPhase subclasses that define the two phases in this UDTF.
* Extracts terms from documents.
*/classForwardIndexPhase:publicTransformFunctionPhase{virtualvoidgetReturnType(ServerInterface&srvInterface,constSizedColumnTypes&inputTypes,SizedColumnTypes&outputTypes){// Sanity checks on input we've been given.// Expected input: (doc_id INTEGER, text VARCHAR)vector<size_t>argCols;inputTypes.getArgumentColumns(argCols);if(argCols.size()<2||!inputTypes.getColumnType(argCols.at(0)).isInt()||!inputTypes.getColumnType(argCols.at(1)).isVarchar())vt_report_error(0,"Function only accepts two arguments""(INTEGER, VARCHAR))");// Output of this phase is:// (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)// Number of times term appears within a document.outputTypes.addInt("term_freq");// Add analytic clause columns: (PARTITION BY term ORDER BY doc_id).// The length of any term is at most the size of the entire document.outputTypes.addVarcharPartitionColumn(inputTypes.getColumnType(argCols.at(1)).getStringLength(),"term");// Add order column on the basis of the document id's data type.outputTypes.addOrderColumn(inputTypes.getColumnType(argCols.at(0)),"doc_id");}virtualTransformFunction*createTransformFunction(ServerInterface&srvInterface){returnvt_createFuncObj(srvInterface.allocator,ForwardIndexBuilder);}};/**
* Constructs terms' posting lists.
*/classInvertedIndexPhase:publicTransformFunctionPhase{virtualvoidgetReturnType(ServerInterface&srvInterface,constSizedColumnTypes&inputTypes,SizedColumnTypes&outputTypes){// Sanity checks on input we've been given.// Expected input:// (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)vector<size_t>argCols;inputTypes.getArgumentColumns(argCols);vector<size_t>pByCols;inputTypes.getPartitionByColumns(pByCols);vector<size_t>oByCols;inputTypes.getOrderByColumns(oByCols);if(argCols.size()!=1||pByCols.size()!=1||oByCols.size()!=1||!inputTypes.getColumnType(argCols.at(0)).isInt()||!inputTypes.getColumnType(pByCols.at(0)).isVarchar()||!inputTypes.getColumnType(oByCols.at(0)).isInt())vt_report_error(0,"Function expects an argument (INTEGER) with ""analytic clause OVER(PBY VARCHAR OBY INTEGER)");// Output of this phase is:// (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER).outputTypes.addVarchar(inputTypes.getColumnType(pByCols.at(0)).getStringLength(),"term");outputTypes.addInt("doc_id");// Number of times term appears within the document.outputTypes.addInt("term_freq");// Number of documents where the term appears in.outputTypes.addInt("corp_freq");}virtualTransformFunction*createTransformFunction(ServerInterface&srvInterface){returnvt_createFuncObj(srvInterface.allocator,InvertedIndexBuilder);}};ForwardIndexPhasefwardIdxPh;InvertedIndexPhaseinvIdxPh;virtualvoidgetPhases(ServerInterface&srvInterface,std::vector<TransformFunctionPhase*>&phases){fwardIdxPh.setPrepass();// Process documents wherever they're originally stored.phases.push_back(&fwardIdxPh);phases.push_back(&invIdxPh);}virtualvoidgetPrototype(ServerInterface&srvInterface,ColumnTypes&argTypes,ColumnTypes&returnType){// Expected input: (doc_id INTEGER, text VARCHAR).argTypes.addInt();argTypes.addVarchar();// Output is: (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER)returnType.addVarchar();returnType.addInt();returnType.addInt();returnType.addInt();}};RegisterFactory(InvertedIndexFactory);
Most of the code in this example is similar to the code in a TransformFunctionFactory class:
Both TransformFunctionPhase subclasses implement the getReturnType() function, which describes the output of each stage. This is the similar to the getReturnType() function from the TransformFunctionFactory class. However, this function also lets you control how the data is partitioned and ordered between each phase of your multi-phase UDTF.
The first phase calls SizedColumnTypes::addVarcharPartitionColumn() (rather than just addVarcharColumn()) to set the phase's output table to be partitioned by the column containing the extracted words. It also calls SizedColumnTypes::addOrderColumn() to order the output table by the document ID column. It calls this function instead of one of the data-type-specific functions (such as addIntOrderColumn()) so it can pass the data type of the original column through to the output column.
Any order by column or partition by column set by the final phase of the UDTF in its getReturnType() function is ignored. Its output is returned to the initiator node rather than partitioned and reordered then sent to another phase.
The MultiPhaseTransformFunctionFactory class implements the getPrototype() function, that defines the schemas for the input and output of the multi-phase UDTF. This function is the same as the TransformFunctionFactory::getPrototype() function.
The unique function implemented by the MultiPhaseTransformFunctionFactory class is getPhases(). This function defines the order in which the phases are executed. The fields that represent the phases are pushed into this vector in the order they should execute.
The MultiPhaseTransformFunctionFactory.getPhases() function is also where you flag the first phase of the UDTF as operating on data stored locally on the node (called a "pre-pass" phase) rather than on data partitioned across all nodes. Using this option increases the efficiency of your multi-phase UDTF by avoiding having to move significant amounts of data around the Vertica cluster.
Only the first phase of your UDTF can be a pre-pass phase. You cannot have multiple pre-pass phases, and no later phase can be a pre-pass phase.
To mark the first phase as pre-pass, you call the TransformFunctionPhase::setPrepass() function of the first phase's TransformFunctionPhase instance from within the getPhase() function.
You need to ensure that the output schema of each phase matches the input schema expected by the next phase. In the example code, each TransformFunctionPhase::getReturnType() implementation performs a sanity check on its input and output schemas. Your TransformFunction subclasses can also perform these checks in their processPartition() function.
There is no built-in limit on the number of phases that your multi-phase UDTF can have. However, more phases use more resources. When running in fenced mode, Vertica may terminate UDTFs that use too much memory. See Resource use for C++ UDxs.
10 - Python example: multi-phase calculation
The following example shows a multi-phase transform function that computes the average value on a column of numbers in an input table.
The following example shows a multi-phase transform function that computes the average value on a column of numbers in an input table. It first defines two transform functions, and then defines a factory that creates the phases using them.
See AvgMultiPhaseUDT.py in the examples distribution for the complete code.
Loading and using the example
Create the library and function:
=> CREATE LIBRARY pylib_avg AS '/home/dbadmin/udx/AvgMultiPhaseUDT.py' LANGUAGE 'Python';
You can then use the function in SELECT statements:
=> COPY numbers FROM STDIN delimiter ',';
=> SELECT myAvg(num) OVER() FROM numbers;
average | ignored_rows | total_rows
2.5 | 0 | 4
(1 row)
All Python UDxs must import the Vertica SDK. This example also imports another library.
Component transform functions
A multi-phase transform function must define two or more TransformFunction subclasses to be used in the phases. This example uses two classes: LocalCalculation, which does calculations on local partitions, and GlobalCalculation, which aggregates the results of all LocalCalculation instances to calculate a final result.
In both functions, the calculation is done in the processPartition() function:
This class is the first phase and calculates the local values for sum, ignored_rows and total_rows.
"""defsetup(self,server_interface,col_types):server_interface.log("Setup: Phase0")self.local_sum=0.0self.ignored_rows=0self.total_rows=0defprocessPartition(self,server_interface,input,output):server_interface.log("Process Partition: Phase0")whileTrue:self.total_rows+=1ifinput.isNull(0)ormath.isinf(input.getFloat(0))ormath.isnan(input.getFloat(0)):# Null, Inf, or Nan is ignoredself.ignored_rows+=1else:self.local_sum+=input.getFloat(0)ifnotinput.next():breakoutput.setFloat(0,self.local_sum)output.setInt(1,self.ignored_rows)output.setInt(2,self.total_rows)output.next()classGlobalCalculation(vertica_sdk.TransformFunction):"""
This class is the second phase and aggregates the values for sum, ignored_rows and total_rows.
"""defsetup(self,server_interface,col_types):server_interface.log("Setup: Phase1")self.global_sum=0.0self.ignored_rows=0self.total_rows=0defprocessPartition(self,server_interface,input,output):server_interface.log("Process Partition: Phase1")whileTrue:self.global_sum+=input.getFloat(0)self.ignored_rows+=input.getInt(1)self.total_rows+=input.getInt(2)ifnotinput.next():breakaverage=self.global_sum/(self.total_rows-self.ignored_rows)output.setFloat(0,average)output.setInt(1,self.ignored_rows)output.setInt(2,self.total_rows)output.next()
Multi-phase factory
A MultiPhaseTransformFunctionFactory ties together the individual functions as phases. The factory defines a TransformFunctionPhase for each function. Each phase defines createTransformFunction(), which calls the constructor for the corresponding TransformFunction, and getReturnType().
The first phase, LocalPhase, follows.
classMyAvgFactory(vertica_sdk.MultiPhaseTransformFunctionFactory):""" Factory class """classLocalPhase(vertica_sdk.TransformFunctionPhase):""" Phase 1 """defgetReturnType(self,server_interface,input_types,output_types):# sanity checknumber_of_cols=input_types.getColumnCount()if(number_of_cols!=1ornotinput_types.getColumnType(0).isFloat()):raiseValueError("Function only accepts one argument (FLOAT))")output_types.addFloat("local_sum");output_types.addInt("ignored_rows");output_types.addInt("total_rows");defcreateTransformFunction(cls,server_interface):returnLocalCalculation()
The second phase, GlobalPhase, does not check its inputs because the first phase already did. As with the first phase, createTransformFunction merely constructs and returns the corresponding TransformFunction.
The following example details a UDTF that takes a partition of arrays, computes the count of each distinct array element in the partition, and outputs each element and its count as a row value.
The following example details a UDTF that takes a partition of arrays, computes the count of each distinct array element in the partition, and outputs each element and its count as a row value. You can call the function on tables that contain multiple partitions of arrays.
The complete source code is in /opt/vertica/sdk/examples/python/TransformFunctions.py.
Loading and using the example
Load the library and create the transform function as follows:
=> CREATE OR REPLACE LIBRARY TransformFunctions AS '/home/dbadmin/examples/python/TransformFunctions.py' LANGUAGE 'Python';
=> CREATE TRANSFORM FUNCTION CountElements AS LANGUAGE 'Python' NAME 'countElementsUDTFactory' LIBRARY TransformFunctions;
You can create some data and then call the function on it, for example:
getReturnType() validates that the only argument to the function is an array and that the return type is a row with 'element' and 'count' fields:
defgetReturnType(self,srv_interface,arg_types,return_type):ifarg_types.getColumnCount()!=1:srv_interface.reportError(1,'countElements UDT should take exactly one argument')ifnotarg_types.getColumnType(0).isArrayType():srv_interface.reportError(2,'Argument to countElements UDT should be an ARRAY')retRowFields=vertica_sdk.SizedColumnTypes.makeEmpty()retRowFields.addColumn(arg_types.getColumnType(0).getElementType(),'element')retRowFields.addInt('count')return_type.addRowType(retRowFields,'element_count')
The countElementsUDTFactory class also contains a createTransformFunction() method that instantiates and returns the transform function.
Function implementation
The processBlock() method is called with a BlockReader and a BlockWriter, named arg_reader and res_writer respectively. The function loops through all the input arrays in a partition and uses a dictionary to collect the frequency of each element. To access elements of each input array, the method instantiates an ArrayReader. After collecting the element counts, the function writes each element and its count to a row. This process is repeated for each partition.
defprocessPartition(self,srv_interface,arg_reader,res_writer):elemCounts=dict()# Collect element counts for entire partitionwhile(True):ifnotarg_reader.isNull(0):arr=arg_reader.getArray(0)foreleminarr:elemCounts[elem]=elemCounts.setdefault(elem,0)+1ifnotarg_reader.next():break# Write at least one value for each partitioniflen(elemCounts)==0:elemCounts[None]=0# Write out element counts as (element, count) pairsforpairinelemCounts.items():res_writer.setRow(0,pair)res_writer.next()
12 - Python example: explode
The following example details a UDTF that accepts a one-dimensional array as input and outputs each element of the array as a separate row, similar to functions commonly known as EXPLODE.
The following example details a UDTF that accepts a one-dimensional array as input and outputs each element of the array as a separate row, similar to functions commonly known as EXPLODE. Because this UDTF always accepts one array as input, you can explicitly mark it as a one-to-many UDTF in the factory function, which helps Vertica optimize query plans and allows users to write SELECT queries that include any expression and do not require an OVER clause.
The complete source code is in /opt/vertica/sdk/examples/python/TransformFunctions.py.
Loading and using the example
Load the library and create the transform function as follows:
=> CREATE OR REPLACE LIBRARY PyTransformFunctions AS '/opt/vertica/sdk/examples/python/TransformFunctions.py' LANGUAGE 'Python';
=> CREATE TRANSFORM FUNCTION py_explode AS LANGUAGE 'Python' NAME 'ExplodeFactory' LIBRARY TransformFunctions;
You can then use the function in SQL statements, for example:
=> CREATE TABLE reviews (id INTEGER PRIMARY KEY, sentiment VARCHAR(16), review ARRAY[VARCHAR(16), 32]);
=> INSERT INTO reviews VALUES(1, 'Very Negative', string_to_array('This was the worst restaurant I have ever had the misfortune of eating at' USING PARAMETERS collection_delimiter = ' ')),
(2, 'Neutral', string_to_array('This restaurant is pretty decent' USING PARAMETERS collection_delimiter = ' ')),
(3, 'Very Positive', string_to_array('Best restaurant in the Western Hemisphere' USING PARAMETERS collection_delimiter = ' ')),
(4, 'Positive', string_to_array('Prices low for the area' USING PARAMETERS collection_delimiter = ' '));
(1 row)
=> SELECT id, sentiment, py_explode(review) FROM reviews; --no OVER clause because "is_exploder = True", see below
id | sentiment | element
1 | Very Negative | This
1 | Very Negative | was
1 | Very Negative | the
1 | Very Negative | worst
1 | Very Negative | restaurant
1 | Very Negative | I
1 | Very Negative | have
3 | Very Positive | Western
3 | Very Positive | Hemisphere
4 | Positive | Prices
4 | Positive | low
4 | Positive | for
4 | Positive | the
4 | Positive | area
(30 rows)
All Python UDxs must import the Vertica SDK library:
Factory implementation
The following code shows the ExplodeFactory class.
classExplodeFactory(vertica_sdk.TransformFunctionFactory):defgetPrototype(self,srv_interface,arg_types,return_type):arg_types.addAny()return_type.addAny()defgetTransformFunctionProperties(cls,server_interface,arg_types):props=vertica_sdk.TransformFunctionFactory.Properties()props.is_exploder=TruereturnpropsdefgetReturnType(self,srv_interface,arg_types,return_type):ifarg_types.getColumnCount()!=1:srv_interface.reportError(1,'explode UDT should take exactly one argument')ifnotarg_types.getColumnType(0).isArrayType():srv_interface.reportError(2,'Argument to explode UDT should be an ARRAY')return_type.addColumn(arg_types.getColumnType(0).getElementType(),'element')defcreateTransformFunction(cls,server_interface):returnExplode()
In this example:
The getTransformFunctionProperties method sets the is_exploder class property to True, explicitly marking the UDTF as one-to-many. This indicates that the function uses an OVER(PARTITION ROW) clause by default and thereby removes the need to specify an OVER clause when calling the UDTF. With is_exploder set to True, users can write SELECT queries that include any expression, unlike queries that use PARTITION BEST or PARTITION NODES.
The getReturnType method verifies that the input contains only one argument and is of type ARRAY. The method also sets the return type to that of the elements in the input array.
Function implementation
The following code shows the Explode class:
Transform function that turns an array into one row
for each array element.
The processPartition() method accepts a single row of the input data, processes each element of the array, and then breaks the loop. The method accesses the elements of the array with an ArrayReader object and then uses an ArrayWriter object to write each element of the array to a separate output row. The UDTF calls processPartition() for each row of the input data.