C++ 示例:多阶段索引器

以下代码片段来自随 Vertica SDK 一起分发的 InvertedIndex UDTF 示例。此代码片段演示了将 MultiPhaseTransformFunctionFactory 子类化以及添加两个 TransformFunctionPhase 子类(定义此 UDTF 中的两个阶段)。

class InvertedIndexFactory : public MultiPhaseTransformFunctionFactory
{
public:
   /**
    * Extracts terms from documents.
    */
   class ForwardIndexPhase : public TransformFunctionPhase
   {
       virtual void getReturnType(ServerInterface &srvInterface,
                                  const SizedColumnTypes &inputTypes,
                                  SizedColumnTypes &outputTypes)
       {
           // Sanity checks on input we've been given.
           // Expected input: (doc_id INTEGER, text VARCHAR)
           vector<size_t> argCols;
           inputTypes.getArgumentColumns(argCols);
           if (argCols.size() < 2 ||
               !inputTypes.getColumnType(argCols.at(0)).isInt() ||
               !inputTypes.getColumnType(argCols.at(1)).isVarchar())
               vt_report_error(0, "Function only accepts two arguments"
                                "(INTEGER, VARCHAR))");
           // Output of this phase is:
           //   (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)
           // Number of times term appears within a document.
           outputTypes.addInt("term_freq");
           // Add analytic clause columns: (PARTITION BY term ORDER BY doc_id).
           // The length of any term is at most the size of the entire document.
           outputTypes.addVarcharPartitionColumn(
                inputTypes.getColumnType(argCols.at(1)).getStringLength(),
                "term");
           // Add order column on the basis of the document id's data type.
           outputTypes.addOrderColumn(inputTypes.getColumnType(argCols.at(0)),
                                      "doc_id");
       }
       virtual TransformFunction *createTransformFunction(ServerInterface
                &srvInterface)
       { return vt_createFuncObj(srvInterface.allocator, ForwardIndexBuilder); }
   };
   /**
    * Constructs terms' posting lists.
    */
   class InvertedIndexPhase : public TransformFunctionPhase
   {
       virtual void getReturnType(ServerInterface &srvInterface,
                                  const SizedColumnTypes &inputTypes,
                                  SizedColumnTypes &outputTypes)
       {
           // Sanity checks on input we've been given.
           // Expected input:
           //   (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)
           vector<size_t> argCols;
           inputTypes.getArgumentColumns(argCols);
           vector<size_t> pByCols;
           inputTypes.getPartitionByColumns(pByCols);
           vector<size_t> oByCols;
           inputTypes.getOrderByColumns(oByCols);
           if (argCols.size() != 1 || pByCols.size() != 1 || oByCols.size() != 1 ||
               !inputTypes.getColumnType(argCols.at(0)).isInt() ||
               !inputTypes.getColumnType(pByCols.at(0)).isVarchar() ||
               !inputTypes.getColumnType(oByCols.at(0)).isInt())
               vt_report_error(0, "Function expects an argument (INTEGER) with "
                               "analytic clause OVER(PBY VARCHAR OBY INTEGER)");
           // Output of this phase is:
           //   (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER).
           outputTypes.addVarchar(inputTypes.getColumnType(
                                    pByCols.at(0)).getStringLength(),"term");
           outputTypes.addInt("doc_id");
           // Number of times term appears within the document.
           outputTypes.addInt("term_freq");
           // Number of documents where the term appears in.
           outputTypes.addInt("corp_freq");
       }

       virtual TransformFunction *createTransformFunction(ServerInterface
                &srvInterface)
       { return vt_createFuncObj(srvInterface.allocator, InvertedIndexBuilder); }
   };
   ForwardIndexPhase fwardIdxPh;
   InvertedIndexPhase invIdxPh;
   virtual void getPhases(ServerInterface &srvInterface,
        std::vector<TransformFunctionPhase *> &phases)
   {
       fwardIdxPh.setPrepass(); // Process documents wherever they're originally stored.
       phases.push_back(&fwardIdxPh);
       phases.push_back(&invIdxPh);
   }
   virtual void getPrototype(ServerInterface &srvInterface,
                             ColumnTypes &argTypes,
                             ColumnTypes &returnType)
   {
       // Expected input: (doc_id INTEGER, text VARCHAR).
       argTypes.addInt();
       argTypes.addVarchar();
       // Output is: (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER)
       returnType.addVarchar();
       returnType.addInt();
       returnType.addInt();
       returnType.addInt();
   }
};
RegisterFactory(InvertedIndexFactory);

此示例中的大部分代码与 TransformFunctionFactory 类中的代码相似:

  • 这两个 TransformFunctionPhase 子类都实施 getReturnType() 函数,此函数描述每个阶段的输出。这与 TransformFunctionFactory 类中的 getReturnType() 函数相似。但是,此函数还可让您控制如何在多阶段 UDTF 的每个节点之间对数据进行分区和排序。

    第一个阶段调用 SizedColumnTypes::addVarcharPartitionColumn()(而非仅调用 addVarcharColumn()),以将此阶段的输出表设置为按包含提取的词的列进行分区。此阶段还调用 SizedColumnTypes::addOrderColumn(),以便按文档 ID 列对输出表进行排序。此阶段调用该函数而非特定于数据类型的函数(例如 addIntOrderColumn())之一,以便可以将原始列的数据类型传递到输出列。

  • MultiPhaseTransformFunctionFactory 类实施 getPrototype() 函数,此函数定义多阶段 UDTF 的输入和输出的架构。此函数与 TransformFunctionFactory::getPrototype() 函数相同。

MultiPhaseTransformFunctionFactory 类实施的唯一函数是 getPhases()。此函数可定义各个阶段的执行顺序。表示阶段的字段将以预期执行顺序推入该矢量。

您还可以在 MultiPhaseTransformFunctionFactory.getPhases() 函数中将 UDTF 的第一个阶段标记为对存储在节点本地的数据(而非分区到所有节点的数据)执行操作(称为“预通过”阶段)。使用此选项,您不必在 Vertica 群集中四处移动大量数据,从而提高多阶段 UDTF 的效率。

要将第一个阶段标记为预通过,您可以从 getPhase() 函数中调用第一个阶段的 TransformFunctionPhase 实例的 TransformFunctionPhase::setPrepass() 函数。

注意

  • 您需要确保每个阶段的输出架构与下一个阶段所需的输入架构匹配。在示例代码中,每个 TransformFunctionPhase::getReturnType() 实施将对其输入架构和输出架构执行健全性检查。TransformFunction 子类还可以在其 processPartition() 函数中执行这些检查。

  • 您的多阶段 UDTF 可以拥有的阶段数没有内置限制。但,阶段数越多,使用的资源就越多。在隔离模式下运行时,Vertica 可能会终止使用太多内存的 UDTF。请参阅C++ UDx 的资源使用情况