C++ example: average

The Average aggregate function created in this example computes the average of values in a column.

The Average aggregate function created in this example computes the average of values in a column.

You can find the source code used in this example on the Vertica GitHub page.

Loading the example

Use CREATE LIBRARY and CREATE AGGREGATE FUNCTION to declare the function:

=> CREATE LIBRARY AggregateFunctions AS
'/opt/vertica/sdk/examples/build/AggregateFunctions.so';
CREATE LIBRARY
=> CREATE aggregate function ag_avg AS LANGUAGE 'C++'
name 'AverageFactory' library AggregateFunctions;
CREATE AGGREGATE FUNCTION

Using the example

Use the function as part of a SELECT statement:


=> SELECT * FROM average;
id | count
----+---------
A  |       8
B  |       3
C  |       6
D  |       2
E  |       9
F  |       7
G  |       5
H  |       4
I  |       1
(9 rows)
=> SELECT ag_avg(count) FROM average;
ag_avg
--------
  5
(1 row)

AggregateFunction implementation

This example adds the input argument values in the aggregate() method and keeps a counter of the number of values added. The server runs aggregate() on every node and different data chunks, and combines all the individually added values and counters in the combine() method. Finally, the average value is computed in the terminate() method by dividing the total sum by the total number of values processed.

For this discussion, assume the following environment:

  • A three-node Vertica cluster

  • A table column that contains nine values that are evenly distributed across the nodes. Schematically, the nodes look like the following figure:

The function uses sum and count variables. Sum contains the sum of the values, and count contains the count of values.

First, initAggregate() initializes the variables and sets their values to zero.

virtual void initAggregate(ServerInterface &srvInterface,
                           IntermediateAggs &aggs)
{
  try {
    VNumeric &sum = aggs.getNumericRef(0);
    sum.setZero();

    vint &count = aggs.getIntRef(1);
    count = 0;
  }
  catch(std::exception &e) {
    vt_ report_ error(0, "Exception while initializing intermediate aggregates: [% s]", e.what());
  }
}

The aggregate() function reads the block of data on each node and calculates partial aggregates.

void aggregate(ServerInterface &srvInterface,
               BlockReader &argReader,
               IntermediateAggs &aggs)
{
     try {
          VNumeric &sum = aggs.getNumericRef(0);
          vint     &count = aggs.getIntRef(1);
          do {
              const VNumeric &input = argReader.getNumericRef(0);
              if (!input.isNull()) {
               sum.accumulate(&input);
           count++;
              }
          } while (argReader.next());
    } catch(std::exception &e) {
       vt_ report_ error(0, " Exception while processing aggregate: [% s]", e.what());
    }
}

Each completed instance of the aggregate() function returns multiple partial aggregates for sum and count. The following figure illustrates this process using the aggregate() function:

The combine() function puts together the partial aggregates calculated by each instance of the average function.

virtual void combine(ServerInterface &srvInterface,
                     IntermediateAggs &aggs,
                     MultipleIntermediateAggs &aggsOther)
{
    try {
        VNumeric       &mySum      = aggs.getNumericRef(0);
        vint           &myCount    = aggs.getIntRef(1);

        // Combine all the other intermediate aggregates
        do {
            const VNumeric &otherSum   = aggsOther.getNumericRef(0);
            const vint     &otherCount = aggsOther.getIntRef(1);

            // Do the actual accumulation
            mySum.accumulate(&otherSum);
            myCount += otherCount;

        } while (aggsOther.next());
    } catch(std::exception &e) {
        // Standard exception. Quit.
        vt_report_error(0, "Exception while combining intermediate aggregates: [%s]", e.what());
    }
}

The following figure shows how each partial aggregate is combined:

After all input has been evaluated by the aggregate() function Vertica calls the terminate() function. It returns the average to the caller.

virtual void terminate(ServerInterface &srvInterface,
                       BlockWriter &resWriter,
                       IntermediateAggs &aggs)
{
      try {
           const int32 MAX_INT_PRECISION = 20;
           const int32 prec = Basics::getNumericWordCount(MAX_INT_PRECISION);
           uint64 words[prec];
           VNumeric count(words,prec,0/*scale*/);
           count.copy(aggs.getIntRef(1));
           VNumeric &out = resWriter.getNumericRef();
           if (count.isZero()) {
               out.setNull();
           } else
               const VNumeric &sum = aggs.getNumericRef(0);
               out.div(&sum, &count);
        }
}

The following figure shows the implementation of the terminate() function:

AggregateFunctionFactory implementation

The getPrototype() function allows you to define the variables that are sent to your aggregate function and returned to Vertica after your aggregate function runs. The following example accepts and returns a numeric value:

virtual void getPrototype(ServerInterface &srvfloaterface,
                          ColumnTypes &argTypes,
                          ColumnTypes &returnType)
    {
        argTypes.addNumeric();
        returnType.addNumeric();
    }

The getIntermediateTypes() function defines any intermediate variables that you use in your aggregate function. Intermediate variables are values used to pass data among multiple invocations of an aggregate function. They are used to combine results until a final result can be computed. In this example, there are two results - total (numeric) and count (int).

 virtual void getIntermediateTypes(ServerInterface &srvInterface,
                                   const SizedColumnTypes &inputTypes,
                                   SizedColumnTypes &intermediateTypeMetaData)
    {
        const VerticaType &inType = inputTypes.getColumnType(0);
        intermediateTypeMetaData.addNumeric(interPrec, inType.getNumericScale());
        intermediateTypeMetaData.addInt();
    }

The getReturnType() function defines the output data type:

    virtual void getReturnType(ServerInterface &srvfloaterface,
                               const SizedColumnTypes &inputTypes,
                               SizedColumnTypes &outputTypes)
    {
        const VerticaType &inType = inputTypes.getColumnType(0);
        outputTypes.addNumeric(inType.getNumericPrecision(),
        inType.getNumericScale());
    }