C++ example: PolyNthValue

The PolyNthValue example is an analytic function that returns the value in the Nth row in each partition in its input.

The PolyNthValue example is an analytic function that returns the value in the Nth row in each partition in its input. This function is a generalization of FIRST_VALUE [analytic] and LAST_VALUE [analytic].

The values can be of any primitive data type.

For the complete source code, see PolymorphicNthValue.cpp in the examples (in /opt/vertica/sdk/examples/AnalyticFunctions/).

Loading and using the example

Load the library and create the function as follows:

=> CREATE LIBRARY AnalyticFunctions AS '/home/dbadmin/AnalyticFns.so';
CREATE LIBRARY

=> CREATE ANALYTIC FUNCTION poly_nth_value AS LANGUAGE 'C++'
   NAME 'PolyNthValueFactory' LIBRARY AnalyticFunctions;
CREATE ANALYTIC FUNCTION

Consider a table of scores for different test groups:

=> SELECT cohort, score FROM trials;
 cohort | score
--------+-------
   1    | 9
   1    | 8
   1    | 7
   3    | 3
   3    | 2
   3    | 1
   2    | 4
   2    | 5
   2    | 6
(9 rows)

Call the function in a query that uses an OVER clause to partition the data. This example returns the second-highest score in each cohort:

=> SELECT cohort, score, poly_nth_value(score USING PARAMETERS n=2) OVER (PARTITION BY cohort) AS nth_value
FROM trials;
 cohort | score | nth_value
--------+-------+-----------
   1    | 9     |         8
   1    | 8     |         8
   1    | 7     |         8
   3    | 3     |         2
   3    | 2     |         2
   3    | 1     |         2
   2    | 4     |         5
   2    | 5     |         5
   2    | 6     |         5
(9 rows)

Factory implementation

The factory declares that the class is polymorphic, and then sets the return type based on the input type. Two factory methods specify the argument and return types.

Use the getPrototype() method to declare that the analytic function takes and returns any type:

    void getPrototype(ServerInterface &srvInterface, ColumnTypes &argTypes, ColumnTypes &returnType)
    {
        // This function supports any argument data type
        argTypes.addAny();

        // Output data type will be the same as the argument data type
        // We will specify that in getReturnType()
        returnType.addAny();
    }

The getReturnType() method is called at runtime. This is where you set the return type based on the input type:

    void getReturnType(ServerInterface &srvInterface, const SizedColumnTypes &inputTypes,
                       SizedColumnTypes &outputTypes)
    {
        // This function accepts only one argument
        // Complain if we find a different number
        std::vector<size_t> argCols;
        inputTypes.getArgumentColumns(argCols); // get argument column indices

        if (argCols.size() != 1)
        {
            vt_report_error(0, "Only one argument is expected but %s provided",
                            argCols.size()? std::to_string(argCols.size()).c_str() : "none");
        }

        // Define output type the same as argument type
        outputTypes.addArg(inputTypes.getColumnType(argCols[0]), inputTypes.getColumnName(argCols[0]));
    }

Function implementation

The analytic function itself is type-agnostic:


    void processPartition(ServerInterface &srvInterface, AnalyticPartitionReader &inputReader,
                          AnalyticPartitionWriter &outputWriter)
    {
        try {
            const SizedColumnTypes &inTypes = inputReader.getTypeMetaData();
            std::vector<size_t> argCols; // Argument column indexes.
            inTypes.getArgumentColumns(argCols);

            vint currentRow = 1;
            bool nthRowExists = false;

            // Find the value of the n-th row
            do {
                if (currentRow == this->n) {
                    nthRowExists = true;
                    break;
                } else {
                    currentRow++;
                }
            } while (inputReader.next());

            if (nthRowExists) {
                do {
                    // Return n-th value
                    outputWriter.copyFromInput(0 /*dest column*/, inputReader,
                                               argCols[0] /*source column*/);
                } while (outputWriter.next());
            } else {
                // The partition has less than n rows
                // Return NULL value
                do {
                    outputWriter.setNull(0);
                } while (outputWriter.next());
            }
        } catch(std::exception& e) {
            // Standard exception. Quit.
            vt_report_error(0, "Exception while processing partition: [%s]", e.what());
        }
    }
};