聚合函数可对值集执行操作并返回单个值。Vertica 提供标准内置聚合函数,例如 AVG、MAX 和 MIN。用户定义的聚合函数 (UDAF) 提供类似的功能:
-
支持单个输入列(或一组)值并提供单个输出列。
-
支持 RLE 解压缩。RLE 输入会在发送到 UDAF 之前进行解压缩。
限制
以下限制适用于 UDAF:
-
仅适用于 C++。
-
不能在隔离模式下运行。
-
不能与相关子查询一起使用。
AggregateFunction
类可执行聚合。它会计算存储相关数据的每个数据库节点上的值,然后组合各个节点中的结果。您必须实施以下方法:
initAggregate()
- 初始化类、定义变量以及设置变量的起始值。此函数必须是等幂的。
aggregate()
- 在每个节点上执行的主要聚合操作。
combine()
- 如果需要多次调用 aggregate()
,Vertica 会调用 combine()
以将所有子聚合合并为最终的聚合。虽然不可能调用此方法,但您必须定义此方法。
terminate()
- 终止函数并以列的形式返回结果。
aggregate()
函数可能无法同时对整个输入集执行操作。因此,initAggregate()
必须是等幂的。
AggregateFunction
类还提供了以下两种可选方法,您可以实施这两种方法以分配和释放资源: setup()
和 destroy()
。您应使用这些方法来分配和取消分配那些不通过 UDAF API 分配的资源(有关详细信息,请参阅为 UDx 分配资源)。
仅 C++ 支持聚合函数。
AggregateFunction API 提供了以下通过子类扩展的方法:
virtual void setup(ServerInterface &srvInterface,
const SizedColumnTypes &argTypes);
virtual void initAggregate(ServerInterface &srvInterface, IntermediateAggs &aggs)=0;
void aggregate(ServerInterface &srvInterface, BlockReader &arg_reader,
IntermediateAggs &aggs);
virtual void combine(ServerInterface &srvInterface, IntermediateAggs &aggs_output,
MultipleIntermediateAggs &aggs_other)=0;
virtual void terminate(ServerInterface &srvInterface, BlockWriter &res_writer,
IntermediateAggs &aggs);
virtual void cancel(ServerInterface &srvInterface);
virtual void destroy(ServerInterface &srvInterface, const SizedColumnTypes &argTypes);
AggregateFunctionFactory
类可指定元数据信息,例如,聚合函数的参数和返回类型。此类还可将 AggregateFunction
子类实例化。子类必须实施以下方法:
getPrototype()
- 定义函数所接受的参数数量和数据类型。聚合函数接受单个参数。
getIntermediateTypes()
- 定义函数所使用的中间变量。这些变量在组合 aggregate()
调用的结果时使用。
getReturnType()
- 定义输出列的类型。
您的函数还可以实施 getParameterType()
,后者定义了此函数使用的参数名称和类型。
当您调用 CREATE AGGREGATE FUNCTION SQL 语句以将函数添加到数据库编录时,Vertica 将使用这些数据。
仅 C++ 支持聚合函数。
AggregateFunctionFactory API 提供了以下通过子类扩展的方法:
virtual AggregateFunction *
createAggregateFunction ServerInterface &srvInterface)=0;
virtual void getPrototype(ServerInterface &srvInterface,
ColumnTypes &argTypes, ColumnTypes &returnType)=0;
virtual void getIntermediateTypes(ServerInterface &srvInterface,
const SizedColumnTypes &inputTypes, SizedColumnTypes &intermediateTypeMetaData)=0;
virtual void getReturnType(ServerInterface &srvInterface,
const SizedColumnTypes &argTypes, SizedColumnTypes &returnType)=0;
virtual void getParameterType(ServerInterface &srvInterface,
SizedColumnTypes ¶meterTypes);
如果调用 UDAF 的 SQL 语句还包含 GROUP BY 子句,该 UDAF 的性能可能会低于预期。例如:
=> SELECT a, MYUDAF(b) FROM sampletable GROUP BY a;
在类似于以上示例的语句中,Vertica 不会在调用 UDAF 的 aggregate()
方法之前将行数据合并到一起。相反,它会为每个数据行调用 aggregate()
一次。通常,让 Vertica 合并行数据的开销超过为每个数据行调用 aggregate()
的开销。但是,如果 UDAF 的 aggregate()
方法具有很大开销,您可能会发现 UDAF 的性能受到影响。
例如,假设内存由 aggregate()
分配。在包含 GROUP BY 子句的语句中调用此方法时,此方法将为每个数据行执行该内存分配。因为内存分配是一个成本相对高昂的过程,所以此分配会影响 UDAF 和查询的总体性能。
有两种方法可用于解决包含 GROUP BY 子句的语句中的 UDAF 性能问题:
减少对 aggregate()
的每个调用的开销。如果可能,将任何分配或其他设置操作移到 UDAF 的 setup()
函数。
声明一个特殊参数,以指示 Vertica 在调用 UDAF 时将行数据分组到一起。下文介绍此技术。
UDAF 可以指示 Vertica 始终将行数据分组到一起,以减少对其 aggregate()
方法调用的次数。要触发此行为,UDAF 必须声明一个名为 _minimizeCallCount
的整数参数。您不需要在 SQL 语句中为此参数设置值。如果 UDAF 声明此参数,将触发 Vertica 在调用 aggregate()
时将行数据分组到一起。
声明 _minimizeCallCount
参数的方法与声明其他 UDx 参数相同。有关详细信息,请参阅UDx 参数。
_minimizeCallCount
参数之前和之后,始终应测试 UDAF 的性能,以确保提高性能。您可能会发现让 Vertica 为 UDAF 分组行数据的开销超过重复调用 aggregate()
的成本。
在此示例中创建的 Average
聚合函数将计算列中各值的平均值。
您可以在 Vertica GitHub 页面上找到此示例中使用的源代码。
使用 CREATE LIBRARY 和 CREATE AGGREGATE FUNCTION 声明函数:
=> CREATE LIBRARY AggregateFunctions AS
'/opt/vertica/sdk/examples/build/AggregateFunctions.so';
CREATE LIBRARY
=> CREATE aggregate function ag_avg AS LANGUAGE 'C++'
name 'AverageFactory' library AggregateFunctions;
CREATE AGGREGATE FUNCTION
将该函数用作 SELECT 语句的一部分:
=> SELECT * FROM average;
id | count
----+---------
A | 8
B | 3
C | 6
D | 2
E | 9
F | 7
G | 5
H | 4
I | 1
(9 rows)
=> SELECT ag_avg(count) FROM average;
ag_avg
--------
5
(1 row)
此示例会在 aggregate()
方法中添加输入实参值,并保留所添加值数量的计数器。服务器会在每个节点和不同的数据块上运行 aggregate()
,并在 combine()
方法中组合所有单独添加的值和计数器。最后,在 terminate()
方法中通过用总和除以已处理值的总数来计算平均值。
为进行此讨论,假设采用以下环境:
三节点 Vertica 群集
包含九个值的表列,这些值均匀分布在各个节点上。根据示意图,各个节点如下图所示:
该函数使用 sum 和 count 变量。sum 包含值的总和,count 包含值的计数。
首先,initAggregate()
会初始化变量并将其值设为零。
virtual void initAggregate(ServerInterface &srvInterface,
IntermediateAggs &aggs)
{
try {
VNumeric &sum = aggs.getNumericRef(0);
sum.setZero();
vint &count = aggs.getIntRef(1);
count = 0;
}
catch(std::exception &e) {
vt_ report_ error(0, "Exception while initializing intermediate aggregates: [% s]", e.what());
}
}
aggregate()
函数会读取每个节点上的数据块并计算部分聚合。
void aggregate(ServerInterface &srvInterface,
BlockReader &argReader,
IntermediateAggs &aggs)
{
try {
VNumeric &sum = aggs.getNumericRef(0);
vint &count = aggs.getIntRef(1);
do {
const VNumeric &input = argReader.getNumericRef(0);
if (!input.isNull()) {
sum.accumulate(&input);
count++;
}
} while (argReader.next());
} catch(std::exception &e) {
vt_ report_ error(0, " Exception while processing aggregate: [% s]", e.what());
}
}
aggregate()
函数的每个已完成实例都会返回 sum 和 count 的多个部分聚合。下图使用 aggregate()
函数说明了此过程:
combine()
函数会将 average 函数的每个实例计算出的部分聚合组合在一起。
virtual void combine(ServerInterface &srvInterface,
IntermediateAggs &aggs,
MultipleIntermediateAggs &aggsOther)
{
try {
VNumeric &mySum = aggs.getNumericRef(0);
vint &myCount = aggs.getIntRef(1);
// Combine all the other intermediate aggregates
do {
const VNumeric &otherSum = aggsOther.getNumericRef(0);
const vint &otherCount = aggsOther.getIntRef(1);
// Do the actual accumulation
mySum.accumulate(&otherSum);
myCount += otherCount;
} while (aggsOther.next());
} catch(std::exception &e) {
// Standard exception. Quit.
vt_report_error(0, "Exception while combining intermediate aggregates: [%s]", e.what());
}
}
下图显示了如何组合每个部分聚合:
在 aggregate()
函数评估完所有输入之后,Vertica 会调用 terminate()
函数。该函数会向调用者返回平均值。
virtual void terminate(ServerInterface &srvInterface,
BlockWriter &resWriter,
IntermediateAggs &aggs)
{
try {
const int32 MAX_INT_PRECISION = 20;
const int32 prec = Basics::getNumericWordCount(MAX_INT_PRECISION);
uint64 words[prec];
VNumeric count(words,prec,0/*scale*/);
count.copy(aggs.getIntRef(1));
VNumeric &out = resWriter.getNumericRef();
if (count.isZero()) {
out.setNull();
} else
const VNumeric &sum = aggs.getNumericRef(0);
out.div(&sum, &count);
}
}
下图显示了 terminate()
函数的实施过程:
使用 getPrototype()
函数,可以定义要发送到聚合函数的变量以及在聚合函数运行后返回给 Vertica 的变量。以下示例接受并返回了数值:
virtual void getPrototype(ServerInterface &srvfloaterface,
ColumnTypes &argTypes,
ColumnTypes &returnType)
{
argTypes.addNumeric();
returnType.addNumeric();
}
getIntermediateTypes()
函数可定义在聚合函数中使用的任何中间变量。中间变量是用于在多次调用聚合函数期间传递数据的值。它们用于组合结果,直到可以计算出最终结果。在此示例中,存在两个结果 - 总计(数字)和计数(整数)。
virtual void getIntermediateTypes(ServerInterface &srvInterface,
const SizedColumnTypes &inputTypes,
SizedColumnTypes &intermediateTypeMetaData)
{
const VerticaType &inType = inputTypes.getColumnType(0);
intermediateTypeMetaData.addNumeric(interPrec, inType.getNumericScale());
intermediateTypeMetaData.addInt();
}
getReturnType()
函数用于定义输出数据类型:
virtual void getReturnType(ServerInterface &srvfloaterface,
const SizedColumnTypes &inputTypes,
SizedColumnTypes &outputTypes)
{
const VerticaType &inType = inputTypes.getColumnType(0);
outputTypes.addNumeric(inType.getNumericPrecision(),
inType.getNumericScale());
}