C++ example: multi-phase indexer

The following code fragment is from the InvertedIndex UDTF example distributed with the Vertica SDK.

The following code fragment is from the InvertedIndex UDTF example distributed with the Vertica SDK. It demonstrates subclassing the MultiPhaseTransformFunctionFactory including two TransformFunctionPhase subclasses that define the two phases in this UDTF.

class InvertedIndexFactory : public MultiPhaseTransformFunctionFactory
{
public:
   /**
    * Extracts terms from documents.
    */
   class ForwardIndexPhase : public TransformFunctionPhase
   {
       virtual void getReturnType(ServerInterface &srvInterface,
                                  const SizedColumnTypes &inputTypes,
                                  SizedColumnTypes &outputTypes)
       {
           // Sanity checks on input we've been given.
           // Expected input: (doc_id INTEGER, text VARCHAR)
           vector<size_t> argCols;
           inputTypes.getArgumentColumns(argCols);
           if (argCols.size() < 2 ||
               !inputTypes.getColumnType(argCols.at(0)).isInt() ||
               !inputTypes.getColumnType(argCols.at(1)).isVarchar())
               vt_report_error(0, "Function only accepts two arguments"
                                "(INTEGER, VARCHAR))");
           // Output of this phase is:
           //   (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)
           // Number of times term appears within a document.
           outputTypes.addInt("term_freq");
           // Add analytic clause columns: (PARTITION BY term ORDER BY doc_id).
           // The length of any term is at most the size of the entire document.
           outputTypes.addVarcharPartitionColumn(
                inputTypes.getColumnType(argCols.at(1)).getStringLength(),
                "term");
           // Add order column on the basis of the document id's data type.
           outputTypes.addOrderColumn(inputTypes.getColumnType(argCols.at(0)),
                                      "doc_id");
       }
       virtual TransformFunction *createTransformFunction(ServerInterface
                &srvInterface)
       { return vt_createFuncObj(srvInterface.allocator, ForwardIndexBuilder); }
   };
   /**
    * Constructs terms' posting lists.
    */
   class InvertedIndexPhase : public TransformFunctionPhase
   {
       virtual void getReturnType(ServerInterface &srvInterface,
                                  const SizedColumnTypes &inputTypes,
                                  SizedColumnTypes &outputTypes)
       {
           // Sanity checks on input we've been given.
           // Expected input:
           //   (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)
           vector<size_t> argCols;
           inputTypes.getArgumentColumns(argCols);
           vector<size_t> pByCols;
           inputTypes.getPartitionByColumns(pByCols);
           vector<size_t> oByCols;
           inputTypes.getOrderByColumns(oByCols);
           if (argCols.size() != 1 || pByCols.size() != 1 || oByCols.size() != 1 ||
               !inputTypes.getColumnType(argCols.at(0)).isInt() ||
               !inputTypes.getColumnType(pByCols.at(0)).isVarchar() ||
               !inputTypes.getColumnType(oByCols.at(0)).isInt())
               vt_report_error(0, "Function expects an argument (INTEGER) with "
                               "analytic clause OVER(PBY VARCHAR OBY INTEGER)");
           // Output of this phase is:
           //   (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER).
           outputTypes.addVarchar(inputTypes.getColumnType(
                                    pByCols.at(0)).getStringLength(),"term");
           outputTypes.addInt("doc_id");
           // Number of times term appears within the document.
           outputTypes.addInt("term_freq");
           // Number of documents where the term appears in.
           outputTypes.addInt("corp_freq");
       }

       virtual TransformFunction *createTransformFunction(ServerInterface
                &srvInterface)
       { return vt_createFuncObj(srvInterface.allocator, InvertedIndexBuilder); }
   };
   ForwardIndexPhase fwardIdxPh;
   InvertedIndexPhase invIdxPh;
   virtual void getPhases(ServerInterface &srvInterface,
        std::vector<TransformFunctionPhase *> &phases)
   {
       fwardIdxPh.setPrepass(); // Process documents wherever they're originally stored.
       phases.push_back(&fwardIdxPh);
       phases.push_back(&invIdxPh);
   }
   virtual void getPrototype(ServerInterface &srvInterface,
                             ColumnTypes &argTypes,
                             ColumnTypes &returnType)
   {
       // Expected input: (doc_id INTEGER, text VARCHAR).
       argTypes.addInt();
       argTypes.addVarchar();
       // Output is: (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER)
       returnType.addVarchar();
       returnType.addInt();
       returnType.addInt();
       returnType.addInt();
   }
};
RegisterFactory(InvertedIndexFactory);

Most of the code in this example is similar to the code in a TransformFunctionFactory class:

  • Both TransformFunctionPhase subclasses implement the getReturnType() function, which describes the output of each stage. This is the similar to the getReturnType() function from the TransformFunctionFactory class. However, this function also lets you control how the data is partitioned and ordered between each phase of your multi-phase UDTF.

    The first phase calls SizedColumnTypes::addVarcharPartitionColumn() (rather than just addVarcharColumn()) to set the phase's output table to be partitioned by the column containing the extracted words. It also calls SizedColumnTypes::addOrderColumn() to order the output table by the document ID column. It calls this function instead of one of the data-type-specific functions (such as addIntOrderColumn()) so it can pass the data type of the original column through to the output column.

  • The MultiPhaseTransformFunctionFactory class implements the getPrototype() function, that defines the schemas for the input and output of the multi-phase UDTF. This function is the same as the TransformFunctionFactory::getPrototype() function.

The unique function implemented by the MultiPhaseTransformFunctionFactory class is getPhases(). This function defines the order in which the phases are executed. The fields that represent the phases are pushed into this vector in the order they should execute.

The MultiPhaseTransformFunctionFactory.getPhases() function is also where you flag the first phase of the UDTF as operating on data stored locally on the node (called a "pre-pass" phase) rather than on data partitioned across all nodes. Using this option increases the efficiency of your multi-phase UDTF by avoiding having to move significant amounts of data around the Vertica cluster.

To mark the first phase as pre-pass, you call the TransformFunctionPhase::setPrepass() function of the first phase's TransformFunctionPhase instance from within the getPhase() function.

Notes

  • You need to ensure that the output schema of each phase matches the input schema expected by the next phase. In the example code, each TransformFunctionPhase::getReturnType() implementation performs a sanity check on its input and output schemas. Your TransformFunction subclasses can also perform these checks in their processPartition() function.

  • There is no built-in limit on the number of phases that your multi-phase UDTF can have. However, more phases use more resources. When running in fenced mode, Vertica may terminate UDTFs that use too much memory. See Resource use for C++ UDxs.