Python example: multi-phase calculation
The following example shows a multi-phase transform function that computes the average value on a column of numbers in an input table. It first defines two transform functions, and then defines a factory that creates the phases using them.
See AvgMultiPhaseUDT.py
in the examples distribution for the complete code.
Loading and using the example
Create the library and function:
=> CREATE LIBRARY pylib_avg AS '/home/dbadmin/udx/AvgMultiPhaseUDT.py' LANGUAGE 'Python';
CREATE LIBRARY
=> CREATE TRANSFORM FUNCTION myAvg AS NAME 'MyAvgFactory' LIBRARY pylib_avg;
CREATE TRANSFORM FUNCTION
You can then use the function in SELECT statements:
=> CREATE TABLE IF NOT EXISTS numbers(num FLOAT);
CREATE TABLE
=> COPY numbers FROM STDIN delimiter ',';
1
2
3
4
\.
=> SELECT myAvg(num) OVER() FROM numbers;
average | ignored_rows | total_rows
---------+--------------+------------
2.5 | 0 | 4
(1 row)
Setup
All Python UDxs must import the Vertica SDK. This example also imports another library.
import vertica_sdk
import math
Component transform functions
A multi-phase transform function must define two or more TransformFunction
subclasses to be used in the phases. This example uses two classes: LocalCalculation
, which does calculations on local partitions, and GlobalCalculation
, which aggregates the results of all LocalCalculation
instances to calculate a final result.
In both functions, the calculation is done in the processPartition()
function:
class LocalCalculation(vertica_sdk.TransformFunction):
"""
This class is the first phase and calculates the local values for sum, ignored_rows and total_rows.
"""
def setup(self, server_interface, col_types):
server_interface.log("Setup: Phase0")
self.local_sum = 0.0
self.ignored_rows = 0
self.total_rows = 0
def processPartition(self, server_interface, input, output):
server_interface.log("Process Partition: Phase0")
while True:
self.total_rows += 1
if input.isNull(0) or math.isinf(input.getFloat(0)) or math.isnan(input.getFloat(0)):
# Null, Inf, or Nan is ignored
self.ignored_rows += 1
else:
self.local_sum += input.getFloat(0)
if not input.next():
break
output.setFloat(0, self.local_sum)
output.setInt(1, self.ignored_rows)
output.setInt(2, self.total_rows)
output.next()
class GlobalCalculation(vertica_sdk.TransformFunction):
"""
This class is the second phase and aggregates the values for sum, ignored_rows and total_rows.
"""
def setup(self, server_interface, col_types):
server_interface.log("Setup: Phase1")
self.global_sum = 0.0
self.ignored_rows = 0
self.total_rows = 0
def processPartition(self, server_interface, input, output):
server_interface.log("Process Partition: Phase1")
while True:
self.global_sum += input.getFloat(0)
self.ignored_rows += input.getInt(1)
self.total_rows += input.getInt(2)
if not input.next():
break
average = self.global_sum / (self.total_rows - self.ignored_rows)
output.setFloat(0, average)
output.setInt(1, self.ignored_rows)
output.setInt(2, self.total_rows)
output.next()
Multi-phase factory
A MultiPhaseTransformFunctionFactory
ties together the individual functions as phases. The factory defines a TransformFunctionPhase
for each function. Each phase defines createTransformFunction()
, which calls the constructor for the corresponding TransformFunction
, and getReturnType()
.
The first phase, LocalPhase
, follows.
class MyAvgFactory(vertica_sdk.MultiPhaseTransformFunctionFactory):
""" Factory class """
class LocalPhase(vertica_sdk.TransformFunctionPhase):
""" Phase 1 """
def getReturnType(self, server_interface, input_types, output_types):
# sanity check
number_of_cols = input_types.getColumnCount()
if (number_of_cols != 1 or not input_types.getColumnType(0).isFloat()):
raise ValueError("Function only accepts one argument (FLOAT))")
output_types.addFloat("local_sum");
output_types.addInt("ignored_rows");
output_types.addInt("total_rows");
def createTransformFunction(cls, server_interface):
return LocalCalculation()
The second phase, GlobalPhase
, does not check its inputs because the first phase already did. As with the first phase, createTransformFunction
merely constructs and returns the corresponding TransformFunction
.
class GlobalPhase(vertica_sdk.TransformFunctionPhase):
""" Phase 2 """
def getReturnType(self, server_interface, input_types, output_types):
output_types.addFloat("average");
output_types.addInt("ignored_rows");
output_types.addInt("total_rows");
def createTransformFunction(cls, server_interface):
return GlobalCalculation()
After defining the TransformFunctionPhase
subclasses, the factory instantiates them and chains them together in getPhases()
.
ph0Instance = LocalPhase()
ph1Instance = GlobalPhase()
def getPhases(cls, server_interface):
cls.ph0Instance.setPrepass()
phases = [cls.ph0Instance, cls.ph1Instance]
return phases