C++ 示例:多阶段索引器
以下代码片段来自随 Vertica SDK 一起分发的 InvertedIndex UDTF 示例。此代码片段演示了将 MultiPhaseTransformFunctionFactory
子类化以及添加两个 TransformFunctionPhase
子类(定义此 UDTF 中的两个阶段)。
class InvertedIndexFactory : public MultiPhaseTransformFunctionFactory
{
public:
/**
* Extracts terms from documents.
*/
class ForwardIndexPhase : public TransformFunctionPhase
{
virtual void getReturnType(ServerInterface &srvInterface,
const SizedColumnTypes &inputTypes,
SizedColumnTypes &outputTypes)
{
// Sanity checks on input we've been given.
// Expected input: (doc_id INTEGER, text VARCHAR)
vector<size_t> argCols;
inputTypes.getArgumentColumns(argCols);
if (argCols.size() < 2 ||
!inputTypes.getColumnType(argCols.at(0)).isInt() ||
!inputTypes.getColumnType(argCols.at(1)).isVarchar())
vt_report_error(0, "Function only accepts two arguments"
"(INTEGER, VARCHAR))");
// Output of this phase is:
// (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)
// Number of times term appears within a document.
outputTypes.addInt("term_freq");
// Add analytic clause columns: (PARTITION BY term ORDER BY doc_id).
// The length of any term is at most the size of the entire document.
outputTypes.addVarcharPartitionColumn(
inputTypes.getColumnType(argCols.at(1)).getStringLength(),
"term");
// Add order column on the basis of the document id's data type.
outputTypes.addOrderColumn(inputTypes.getColumnType(argCols.at(0)),
"doc_id");
}
virtual TransformFunction *createTransformFunction(ServerInterface
&srvInterface)
{ return vt_createFuncObj(srvInterface.allocator, ForwardIndexBuilder); }
};
/**
* Constructs terms' posting lists.
*/
class InvertedIndexPhase : public TransformFunctionPhase
{
virtual void getReturnType(ServerInterface &srvInterface,
const SizedColumnTypes &inputTypes,
SizedColumnTypes &outputTypes)
{
// Sanity checks on input we've been given.
// Expected input:
// (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)
vector<size_t> argCols;
inputTypes.getArgumentColumns(argCols);
vector<size_t> pByCols;
inputTypes.getPartitionByColumns(pByCols);
vector<size_t> oByCols;
inputTypes.getOrderByColumns(oByCols);
if (argCols.size() != 1 || pByCols.size() != 1 || oByCols.size() != 1 ||
!inputTypes.getColumnType(argCols.at(0)).isInt() ||
!inputTypes.getColumnType(pByCols.at(0)).isVarchar() ||
!inputTypes.getColumnType(oByCols.at(0)).isInt())
vt_report_error(0, "Function expects an argument (INTEGER) with "
"analytic clause OVER(PBY VARCHAR OBY INTEGER)");
// Output of this phase is:
// (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER).
outputTypes.addVarchar(inputTypes.getColumnType(
pByCols.at(0)).getStringLength(),"term");
outputTypes.addInt("doc_id");
// Number of times term appears within the document.
outputTypes.addInt("term_freq");
// Number of documents where the term appears in.
outputTypes.addInt("corp_freq");
}
virtual TransformFunction *createTransformFunction(ServerInterface
&srvInterface)
{ return vt_createFuncObj(srvInterface.allocator, InvertedIndexBuilder); }
};
ForwardIndexPhase fwardIdxPh;
InvertedIndexPhase invIdxPh;
virtual void getPhases(ServerInterface &srvInterface,
std::vector<TransformFunctionPhase *> &phases)
{
fwardIdxPh.setPrepass(); // Process documents wherever they're originally stored.
phases.push_back(&fwardIdxPh);
phases.push_back(&invIdxPh);
}
virtual void getPrototype(ServerInterface &srvInterface,
ColumnTypes &argTypes,
ColumnTypes &returnType)
{
// Expected input: (doc_id INTEGER, text VARCHAR).
argTypes.addInt();
argTypes.addVarchar();
// Output is: (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER)
returnType.addVarchar();
returnType.addInt();
returnType.addInt();
returnType.addInt();
}
};
RegisterFactory(InvertedIndexFactory);
此示例中的大部分代码与 TransformFunctionFactory
类中的代码相似:
-
这两个
TransformFunctionPhase
子类都实施getReturnType()
函数,此函数描述每个阶段的输出。这与TransformFunctionFactory
类中的getReturnType()
函数相似。但是,此函数还可让您控制如何在多阶段 UDTF 的每个节点之间对数据进行分区和排序。第一个阶段调用
SizedColumnTypes::addVarcharPartitionColumn()
(而非仅调用addVarcharColumn()
),以将此阶段的输出表设置为按包含提取的词的列进行分区。此阶段还调用SizedColumnTypes::addOrderColumn()
,以便按文档 ID 列对输出表进行排序。此阶段调用该函数而非特定于数据类型的函数(例如addIntOrderColumn()
)之一,以便可以将原始列的数据类型传递到输出列。注意
UDTF 的最终阶段在其getReturnType()
函数中设置的任何按列排序或按列分区将被忽略。此阶段的输出会返回到启动程序节点,而不会先进行分区和重新排序再发送到其他阶段。 -
MultiPhaseTransformFunctionFactory
类实施getPrototype()
函数,此函数定义多阶段 UDTF 的输入和输出的架构。此函数与TransformFunctionFactory::getPrototype()
函数相同。
MultiPhaseTransformFunctionFactory
类实施的唯一函数是 getPhases()
。此函数可定义各个阶段的执行顺序。表示阶段的字段将以预期执行顺序推入该矢量。
您还可以在 MultiPhaseTransformFunctionFactory.getPhases()
函数中将 UDTF 的第一个阶段标记为对存储在节点本地的数据(而非分区到所有节点的数据)执行操作(称为“预通过”阶段)。使用此选项,您不必在 Vertica 群集中四处移动大量数据,从而提高多阶段 UDTF 的效率。
注意
只有 UDTF 的第一个阶段可以是预通过阶段。不能具有多个预通过阶段,此后的任何阶段都不能是预通过阶段。要将第一个阶段标记为预通过,您可以从 getPhase()
函数中调用第一个阶段的 TransformFunctionPhase 实例的 TransformFunctionPhase::setPrepass()
函数。
注意
-
您需要确保每个阶段的输出架构与下一个阶段所需的输入架构匹配。在示例代码中,每个
TransformFunctionPhase::getReturnType()
实施将对其输入架构和输出架构执行健全性检查。TransformFunction
子类还可以在其processPartition()
函数中执行这些检查。 -
您的多阶段 UDTF 可以拥有的阶段数没有内置限制。但,阶段数越多,使用的资源就越多。在隔离模式下运行时,Vertica 可能会终止使用太多内存的 UDTF。请参阅C++ UDx 的资源使用情况。