这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

聚合函数 (UDAF)

聚合函数可对值集执行操作并返回单个值。Vertica 提供标准内置聚合函数,例如 AVGMAXMIN。用户定义的聚合函数 (UDAF) 提供类似的功能:

  • 支持单个输入列(或一组)值并提供单个输出列。

  • 支持 RLE 解压缩。RLE 输入会在发送到 UDAF 之前进行解压缩。

  • 支持与 GROUP BYHAVING 子句一起使用。只能选择出现在 GROUP BY 子句中的列。

限制

以下限制适用于 UDAF:

  • 仅适用于 C++。

  • 不能在隔离模式下运行。

  • 不能与相关子查询一起使用。

1 - AggregateFunction 类

AggregateFunction 类可执行聚合。它会计算存储相关数据的每个数据库节点上的值,然后组合各个节点中的结果。您必须实施以下方法:

  • initAggregate() - 初始化类、定义变量以及设置变量的起始值。此函数必须是等幂的。

  • aggregate() - 在每个节点上执行的主要聚合操作。

  • combine() - 如果需要多次调用 aggregate(),Vertica 会调用 combine() 以将所有子聚合合并为最终的聚合。虽然不可能调用此方法,但您必须定义此方法。

  • terminate() - 终止函数并以列的形式返回结果。

AggregateFunction 类还提供了以下两种可选方法,您可以实施这两种方法以分配和释放资源: setup()destroy()。您应使用这些方法来分配和取消分配那些不通过 UDAF API 分配的资源(有关详细信息,请参阅为 UDx 分配资源)。

API

仅 C++ 支持聚合函数。

AggregateFunction API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface,
        const SizedColumnTypes &argTypes);

virtual void initAggregate(ServerInterface &srvInterface, IntermediateAggs &aggs)=0;

void aggregate(ServerInterface &srvInterface, BlockReader &arg_reader,
        IntermediateAggs &aggs);

virtual void combine(ServerInterface &srvInterface, IntermediateAggs &aggs_output,
        MultipleIntermediateAggs &aggs_other)=0;

virtual void terminate(ServerInterface &srvInterface, BlockWriter &res_writer,
        IntermediateAggs &aggs);

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface, const SizedColumnTypes &argTypes);

2 - AggregateFunctionFactory 类

AggregateFunctionFactory 类可指定元数据信息,例如,聚合函数的参数和返回类型。此类还可将 AggregateFunction 子类实例化。子类必须实施以下方法:

  • getPrototype() - 定义函数所接受的参数数量和数据类型。聚合函数接受单个参数。

  • getIntermediateTypes() - 定义函数所使用的中间变量。这些变量在组合 aggregate() 调用的结果时使用。

  • getReturnType() - 定义输出列的类型。

您的函数还可以实施 getParameterType(),后者定义了此函数使用的参数名称和类型。

当您调用 CREATE AGGREGATE FUNCTION SQL 语句以将函数添加到数据库编录时,Vertica 将使用这些数据。

API

仅 C++ 支持聚合函数。

AggregateFunctionFactory API 提供了以下通过子类扩展的方法:

virtual AggregateFunction *
        createAggregateFunction ServerInterface &srvInterface)=0;

virtual void getPrototype(ServerInterface &srvInterface,
        ColumnTypes &argTypes, ColumnTypes &returnType)=0;

virtual void getIntermediateTypes(ServerInterface &srvInterface,
        const SizedColumnTypes &inputTypes, SizedColumnTypes &intermediateTypeMetaData)=0;

virtual void getReturnType(ServerInterface &srvInterface,
        const SizedColumnTypes &argTypes, SizedColumnTypes &returnType)=0;

virtual void getParameterType(ServerInterface &srvInterface,
        SizedColumnTypes &parameterTypes);

3 - 包含 GROUP BY 子句的语句中的 UDAF 性能

如果调用 UDAF 的 SQL 语句还包含 GROUP BY 子句,该 UDAF 的性能可能会低于预期。例如:

=> SELECT a, MYUDAF(b) FROM sampletable GROUP BY a;

在类似于以上示例的语句中,Vertica 不会在调用 UDAF 的 aggregate() 方法之前将行数据合并到一起。相反,它会为每个数据行调用 aggregate() 一次。通常,让 Vertica 合并行数据的开销超过为每个数据行调用 aggregate() 的开销。但是,如果 UDAF 的 aggregate() 方法具有很大开销,您可能会发现 UDAF 的性能受到影响。

例如,假设内存由 aggregate() 分配。在包含 GROUP BY 子句的语句中调用此方法时,此方法将为每个数据行执行该内存分配。因为内存分配是一个成本相对高昂的过程,所以此分配会影响 UDAF 和查询的总体性能。

有两种方法可用于解决包含 GROUP BY 子句的语句中的 UDAF 性能问题:

  • 减少对 aggregate() 的每个调用的开销。如果可能,将任何分配或其他设置操作移到 UDAF 的 setup() 函数。

  • 声明一个特殊参数,以指示 Vertica 在调用 UDAF 时将行数据分组到一起。下文介绍此技术。

使用 _minimizeCallCount 参数

UDAF 可以指示 Vertica 始终将行数据分组到一起,以减少对其 aggregate() 方法调用的次数。要触发此行为,UDAF 必须声明一个名为 _minimizeCallCount 的整数参数。您不需要在 SQL 语句中为此参数设置值。如果 UDAF 声明此参数,将触发 Vertica 在调用 aggregate() 时将行数据分组到一起。

声明 _minimizeCallCount 参数的方法与声明其他 UDx 参数相同。有关详细信息,请参阅UDx 参数

4 - C++ 示例:平均值

在此示例中创建的 Average 聚合函数将计算列中各值的平均值。

您可以在 Vertica GitHub 页面上找到此示例中使用的源代码。

加载示例

使用 CREATE LIBRARY 和 CREATE AGGREGATE FUNCTION 声明函数:

=> CREATE LIBRARY AggregateFunctions AS
'/opt/vertica/sdk/examples/build/AggregateFunctions.so';
CREATE LIBRARY
=> CREATE aggregate function ag_avg AS LANGUAGE 'C++'
name 'AverageFactory' library AggregateFunctions;
CREATE AGGREGATE FUNCTION

使用示例

将该函数用作 SELECT 语句的一部分:


=> SELECT * FROM average;
id | count
----+---------
A  |       8
B  |       3
C  |       6
D  |       2
E  |       9
F  |       7
G  |       5
H  |       4
I  |       1
(9 rows)
=> SELECT ag_avg(count) FROM average;
ag_avg
--------
  5
(1 row)

AggregateFunction 实施

此示例会在 aggregate() 方法中添加输入实参值,并保留所添加值数量的计数器。服务器会在每个节点和不同的数据块上运行 aggregate() ,并在 combine() 方法中组合所有单独添加的值和计数器。最后,在 terminate() 方法中通过用总和除以已处理值的总数来计算平均值。

为进行此讨论,假设采用以下环境:

  • 三节点 Vertica 群集

  • 包含九个值的表列,这些值均匀分布在各个节点上。根据示意图,各个节点如下图所示:

该函数使用 sum 和 count 变量。sum 包含值的总和,count 包含值的计数。

首先,initAggregate() 会初始化变量并将其值设为零。

virtual void initAggregate(ServerInterface &srvInterface,
                           IntermediateAggs &aggs)
{
  try {
    VNumeric &sum = aggs.getNumericRef(0);
    sum.setZero();

    vint &count = aggs.getIntRef(1);
    count = 0;
  }
  catch(std::exception &e) {
    vt_ report_ error(0, "Exception while initializing intermediate aggregates: [% s]", e.what());
  }
}

aggregate() 函数会读取每个节点上的数据块并计算部分聚合。

void aggregate(ServerInterface &srvInterface,
               BlockReader &argReader,
               IntermediateAggs &aggs)
{
     try {
          VNumeric &sum = aggs.getNumericRef(0);
          vint     &count = aggs.getIntRef(1);
          do {
              const VNumeric &input = argReader.getNumericRef(0);
              if (!input.isNull()) {
               sum.accumulate(&input);
           count++;
              }
          } while (argReader.next());
    } catch(std::exception &e) {
       vt_ report_ error(0, " Exception while processing aggregate: [% s]", e.what());
    }
}

aggregate() 函数的每个已完成实例都会返回 sum 和 count 的多个部分聚合。下图使用 aggregate() 函数说明了此过程:

combine() 函数会将 average 函数的每个实例计算出的部分聚合组合在一起。

virtual void combine(ServerInterface &srvInterface,
                     IntermediateAggs &aggs,
                     MultipleIntermediateAggs &aggsOther)
{
    try {
        VNumeric       &mySum      = aggs.getNumericRef(0);
        vint           &myCount    = aggs.getIntRef(1);

        // Combine all the other intermediate aggregates
        do {
            const VNumeric &otherSum   = aggsOther.getNumericRef(0);
            const vint     &otherCount = aggsOther.getIntRef(1);

            // Do the actual accumulation
            mySum.accumulate(&otherSum);
            myCount += otherCount;

        } while (aggsOther.next());
    } catch(std::exception &e) {
        // Standard exception. Quit.
        vt_report_error(0, "Exception while combining intermediate aggregates: [%s]", e.what());
    }
}

下图显示了如何组合每个部分聚合:

aggregate() 函数评估完所有输入之后,Vertica 会调用 terminate() 函数。该函数会向调用者返回平均值。

virtual void terminate(ServerInterface &srvInterface,
                       BlockWriter &resWriter,
                       IntermediateAggs &aggs)
{
      try {
           const int32 MAX_INT_PRECISION = 20;
           const int32 prec = Basics::getNumericWordCount(MAX_INT_PRECISION);
           uint64 words[prec];
           VNumeric count(words,prec,0/*scale*/);
           count.copy(aggs.getIntRef(1));
           VNumeric &out = resWriter.getNumericRef();
           if (count.isZero()) {
               out.setNull();
           } else
               const VNumeric &sum = aggs.getNumericRef(0);
               out.div(&sum, &count);
        }
}

下图显示了 terminate() 函数的实施过程:

AggregateFunctionFactory 实施

使用 getPrototype() 函数,可以定义要发送到聚合函数的变量以及在聚合函数运行后返回给 Vertica 的变量。以下示例接受并返回了数值:

virtual void getPrototype(ServerInterface &srvfloaterface,
                          ColumnTypes &argTypes,
                          ColumnTypes &returnType)
    {
        argTypes.addNumeric();
        returnType.addNumeric();
    }

getIntermediateTypes() 函数可定义在聚合函数中使用的任何中间变量。中间变量是用于在多次调用聚合函数期间传递数据的值。它们用于组合结果,直到可以计算出最终结果。在此示例中,存在两个结果 - 总计(数字)和计数(整数)。

 virtual void getIntermediateTypes(ServerInterface &srvInterface,
                                   const SizedColumnTypes &inputTypes,
                                   SizedColumnTypes &intermediateTypeMetaData)
    {
        const VerticaType &inType = inputTypes.getColumnType(0);
        intermediateTypeMetaData.addNumeric(interPrec, inType.getNumericScale());
        intermediateTypeMetaData.addInt();
    }

getReturnType() 函数用于定义输出数据类型:

    virtual void getReturnType(ServerInterface &srvfloaterface,
                               const SizedColumnTypes &inputTypes,
                               SizedColumnTypes &outputTypes)
    {
        const VerticaType &inType = inputTypes.getColumnType(0);
        outputTypes.addNumeric(inType.getNumericPrecision(),
        inType.getNumericScale());
    }