C++ example: multi-phase indexer
The following code fragment is from the InvertedIndex UDTF example distributed with the Vertica SDK. It demonstrates subclassing the MultiPhaseTransformFunctionFactory
including two TransformFunctionPhase
subclasses that define the two phases in this UDTF.
class InvertedIndexFactory : public MultiPhaseTransformFunctionFactory
{
public:
/**
* Extracts terms from documents.
*/
class ForwardIndexPhase : public TransformFunctionPhase
{
virtual void getReturnType(ServerInterface &srvInterface,
const SizedColumnTypes &inputTypes,
SizedColumnTypes &outputTypes)
{
// Sanity checks on input we've been given.
// Expected input: (doc_id INTEGER, text VARCHAR)
vector<size_t> argCols;
inputTypes.getArgumentColumns(argCols);
if (argCols.size() < 2 ||
!inputTypes.getColumnType(argCols.at(0)).isInt() ||
!inputTypes.getColumnType(argCols.at(1)).isVarchar())
vt_report_error(0, "Function only accepts two arguments"
"(INTEGER, VARCHAR))");
// Output of this phase is:
// (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)
// Number of times term appears within a document.
outputTypes.addInt("term_freq");
// Add analytic clause columns: (PARTITION BY term ORDER BY doc_id).
// The length of any term is at most the size of the entire document.
outputTypes.addVarcharPartitionColumn(
inputTypes.getColumnType(argCols.at(1)).getStringLength(),
"term");
// Add order column on the basis of the document id's data type.
outputTypes.addOrderColumn(inputTypes.getColumnType(argCols.at(0)),
"doc_id");
}
virtual TransformFunction *createTransformFunction(ServerInterface
&srvInterface)
{ return vt_createFuncObj(srvInterface.allocator, ForwardIndexBuilder); }
};
/**
* Constructs terms' posting lists.
*/
class InvertedIndexPhase : public TransformFunctionPhase
{
virtual void getReturnType(ServerInterface &srvInterface,
const SizedColumnTypes &inputTypes,
SizedColumnTypes &outputTypes)
{
// Sanity checks on input we've been given.
// Expected input:
// (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)
vector<size_t> argCols;
inputTypes.getArgumentColumns(argCols);
vector<size_t> pByCols;
inputTypes.getPartitionByColumns(pByCols);
vector<size_t> oByCols;
inputTypes.getOrderByColumns(oByCols);
if (argCols.size() != 1 || pByCols.size() != 1 || oByCols.size() != 1 ||
!inputTypes.getColumnType(argCols.at(0)).isInt() ||
!inputTypes.getColumnType(pByCols.at(0)).isVarchar() ||
!inputTypes.getColumnType(oByCols.at(0)).isInt())
vt_report_error(0, "Function expects an argument (INTEGER) with "
"analytic clause OVER(PBY VARCHAR OBY INTEGER)");
// Output of this phase is:
// (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER).
outputTypes.addVarchar(inputTypes.getColumnType(
pByCols.at(0)).getStringLength(),"term");
outputTypes.addInt("doc_id");
// Number of times term appears within the document.
outputTypes.addInt("term_freq");
// Number of documents where the term appears in.
outputTypes.addInt("corp_freq");
}
virtual TransformFunction *createTransformFunction(ServerInterface
&srvInterface)
{ return vt_createFuncObj(srvInterface.allocator, InvertedIndexBuilder); }
};
ForwardIndexPhase fwardIdxPh;
InvertedIndexPhase invIdxPh;
virtual void getPhases(ServerInterface &srvInterface,
std::vector<TransformFunctionPhase *> &phases)
{
fwardIdxPh.setPrepass(); // Process documents wherever they're originally stored.
phases.push_back(&fwardIdxPh);
phases.push_back(&invIdxPh);
}
virtual void getPrototype(ServerInterface &srvInterface,
ColumnTypes &argTypes,
ColumnTypes &returnType)
{
// Expected input: (doc_id INTEGER, text VARCHAR).
argTypes.addInt();
argTypes.addVarchar();
// Output is: (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER)
returnType.addVarchar();
returnType.addInt();
returnType.addInt();
returnType.addInt();
}
};
RegisterFactory(InvertedIndexFactory);
Most of the code in this example is similar to the code in a TransformFunctionFactory
class:
-
Both
TransformFunctionPhase
subclasses implement thegetReturnType()
function, which describes the output of each stage. This is the similar to thegetReturnType()
function from theTransformFunctionFactory
class. However, this function also lets you control how the data is partitioned and ordered between each phase of your multi-phase UDTF.The first phase calls
SizedColumnTypes::addVarcharPartitionColumn()
(rather than justaddVarcharColumn()
) to set the phase's output table to be partitioned by the column containing the extracted words. It also callsSizedColumnTypes::addOrderColumn()
to order the output table by the document ID column. It calls this function instead of one of the data-type-specific functions (such asaddIntOrderColumn()
) so it can pass the data type of the original column through to the output column.Note
Any order by column or partition by column set by the final phase of the UDTF in itsgetReturnType()
function is ignored. Its output is returned to the initiator node rather than partitioned and reordered then sent to another phase. -
The
MultiPhaseTransformFunctionFactory
class implements thegetPrototype()
function, that defines the schemas for the input and output of the multi-phase UDTF. This function is the same as theTransformFunctionFactory::getPrototype()
function.
The unique function implemented by the MultiPhaseTransformFunctionFactory
class is getPhases()
. This function defines the order in which the phases are executed. The fields that represent the phases are pushed into this vector in the order they should execute.
The MultiPhaseTransformFunctionFactory.getPhases()
function is also where you flag the first phase of the UDTF as operating on data stored locally on the node (called a "pre-pass" phase) rather than on data partitioned across all nodes. Using this option increases the efficiency of your multi-phase UDTF by avoiding having to move significant amounts of data around the Vertica cluster.
Note
Only the first phase of your UDTF can be a pre-pass phase. You cannot have multiple pre-pass phases, and no later phase can be a pre-pass phase.To mark the first phase as pre-pass, you call the TransformFunctionPhase::setPrepass()
function of the first phase's TransformFunctionPhase instance from within the getPhase()
function.
Notes
-
You need to ensure that the output schema of each phase matches the input schema expected by the next phase. In the example code, each
TransformFunctionPhase::getReturnType()
implementation performs a sanity check on its input and output schemas. YourTransformFunction
subclasses can also perform these checks in theirprocessPartition()
function. -
There is no built-in limit on the number of phases that your multi-phase UDTF can have. However, more phases use more resources. When running in fenced mode, Vertica may terminate UDTFs that use too much memory. See Resource use for C++ UDxs.