C++ 示例:平均值

在此示例中创建的 Average 聚合函数将计算列中各值的平均值。

您可以在 Vertica GitHub 页面上找到此示例中使用的源代码。

加载示例

使用 CREATE LIBRARY 和 CREATE AGGREGATE FUNCTION 声明函数:

=> CREATE LIBRARY AggregateFunctions AS
'/opt/vertica/sdk/examples/build/AggregateFunctions.so';
CREATE LIBRARY
=> CREATE aggregate function ag_avg AS LANGUAGE 'C++'
name 'AverageFactory' library AggregateFunctions;
CREATE AGGREGATE FUNCTION

使用示例

将该函数用作 SELECT 语句的一部分:


=> SELECT * FROM average;
id | count
----+---------
A  |       8
B  |       3
C  |       6
D  |       2
E  |       9
F  |       7
G  |       5
H  |       4
I  |       1
(9 rows)
=> SELECT ag_avg(count) FROM average;
ag_avg
--------
  5
(1 row)

AggregateFunction 实施

此示例会在 aggregate() 方法中添加输入实参值,并保留所添加值数量的计数器。服务器会在每个节点和不同的数据块上运行 aggregate() ,并在 combine() 方法中组合所有单独添加的值和计数器。最后,在 terminate() 方法中通过用总和除以已处理值的总数来计算平均值。

为进行此讨论,假设采用以下环境:

  • 三节点 Vertica 群集

  • 包含九个值的表列,这些值均匀分布在各个节点上。根据示意图,各个节点如下图所示:

该函数使用 sum 和 count 变量。sum 包含值的总和,count 包含值的计数。

首先,initAggregate() 会初始化变量并将其值设为零。

virtual void initAggregate(ServerInterface &srvInterface,
                           IntermediateAggs &aggs)
{
  try {
    VNumeric &sum = aggs.getNumericRef(0);
    sum.setZero();

    vint &count = aggs.getIntRef(1);
    count = 0;
  }
  catch(std::exception &e) {
    vt_ report_ error(0, "Exception while initializing intermediate aggregates: [% s]", e.what());
  }
}

aggregate() 函数会读取每个节点上的数据块并计算部分聚合。

void aggregate(ServerInterface &srvInterface,
               BlockReader &argReader,
               IntermediateAggs &aggs)
{
     try {
          VNumeric &sum = aggs.getNumericRef(0);
          vint     &count = aggs.getIntRef(1);
          do {
              const VNumeric &input = argReader.getNumericRef(0);
              if (!input.isNull()) {
               sum.accumulate(&input);
           count++;
              }
          } while (argReader.next());
    } catch(std::exception &e) {
       vt_ report_ error(0, " Exception while processing aggregate: [% s]", e.what());
    }
}

aggregate() 函数的每个已完成实例都会返回 sum 和 count 的多个部分聚合。下图使用 aggregate() 函数说明了此过程:

combine() 函数会将 average 函数的每个实例计算出的部分聚合组合在一起。

virtual void combine(ServerInterface &srvInterface,
                     IntermediateAggs &aggs,
                     MultipleIntermediateAggs &aggsOther)
{
    try {
        VNumeric       &mySum      = aggs.getNumericRef(0);
        vint           &myCount    = aggs.getIntRef(1);

        // Combine all the other intermediate aggregates
        do {
            const VNumeric &otherSum   = aggsOther.getNumericRef(0);
            const vint     &otherCount = aggsOther.getIntRef(1);

            // Do the actual accumulation
            mySum.accumulate(&otherSum);
            myCount += otherCount;

        } while (aggsOther.next());
    } catch(std::exception &e) {
        // Standard exception. Quit.
        vt_report_error(0, "Exception while combining intermediate aggregates: [%s]", e.what());
    }
}

下图显示了如何组合每个部分聚合:

aggregate() 函数评估完所有输入之后,Vertica 会调用 terminate() 函数。该函数会向调用者返回平均值。

virtual void terminate(ServerInterface &srvInterface,
                       BlockWriter &resWriter,
                       IntermediateAggs &aggs)
{
      try {
           const int32 MAX_INT_PRECISION = 20;
           const int32 prec = Basics::getNumericWordCount(MAX_INT_PRECISION);
           uint64 words[prec];
           VNumeric count(words,prec,0/*scale*/);
           count.copy(aggs.getIntRef(1));
           VNumeric &out = resWriter.getNumericRef();
           if (count.isZero()) {
               out.setNull();
           } else
               const VNumeric &sum = aggs.getNumericRef(0);
               out.div(&sum, &count);
        }
}

下图显示了 terminate() 函数的实施过程:

AggregateFunctionFactory 实施

使用 getPrototype() 函数,可以定义要发送到聚合函数的变量以及在聚合函数运行后返回给 Vertica 的变量。以下示例接受并返回了数值:

virtual void getPrototype(ServerInterface &srvfloaterface,
                          ColumnTypes &argTypes,
                          ColumnTypes &returnType)
    {
        argTypes.addNumeric();
        returnType.addNumeric();
    }

getIntermediateTypes() 函数可定义在聚合函数中使用的任何中间变量。中间变量是用于在多次调用聚合函数期间传递数据的值。它们用于组合结果,直到可以计算出最终结果。在此示例中,存在两个结果 - 总计(数字)和计数(整数)。

 virtual void getIntermediateTypes(ServerInterface &srvInterface,
                                   const SizedColumnTypes &inputTypes,
                                   SizedColumnTypes &intermediateTypeMetaData)
    {
        const VerticaType &inType = inputTypes.getColumnType(0);
        intermediateTypeMetaData.addNumeric(interPrec, inType.getNumericScale());
        intermediateTypeMetaData.addInt();
    }

getReturnType() 函数用于定义输出数据类型:

    virtual void getReturnType(ServerInterface &srvfloaterface,
                               const SizedColumnTypes &inputTypes,
                               SizedColumnTypes &outputTypes)
    {
        const VerticaType &inType = inputTypes.getColumnType(0);
        outputTypes.addNumeric(inType.getNumericPrecision(),
        inType.getNumericScale());
    }