C++ 示例:平均值
在此示例中创建的 Average
聚合函数将计算列中各值的平均值。
您可以在 Vertica GitHub 页面上找到此示例中使用的源代码。
加载示例
使用 CREATE LIBRARY 和 CREATE AGGREGATE FUNCTION 声明函数:
=> CREATE LIBRARY AggregateFunctions AS
'/opt/vertica/sdk/examples/build/AggregateFunctions.so';
CREATE LIBRARY
=> CREATE aggregate function ag_avg AS LANGUAGE 'C++'
name 'AverageFactory' library AggregateFunctions;
CREATE AGGREGATE FUNCTION
使用示例
将该函数用作 SELECT 语句的一部分:
=> SELECT * FROM average;
id | count
----+---------
A | 8
B | 3
C | 6
D | 2
E | 9
F | 7
G | 5
H | 4
I | 1
(9 rows)
=> SELECT ag_avg(count) FROM average;
ag_avg
--------
5
(1 row)
AggregateFunction 实施
此示例会在 aggregate()
方法中添加输入实参值,并保留所添加值数量的计数器。服务器会在每个节点和不同的数据块上运行 aggregate()
,并在 combine()
方法中组合所有单独添加的值和计数器。最后,在 terminate()
方法中通过用总和除以已处理值的总数来计算平均值。
为进行此讨论,假设采用以下环境:
-
三节点 Vertica 群集
-
包含九个值的表列,这些值均匀分布在各个节点上。根据示意图,各个节点如下图所示:
该函数使用 sum 和 count 变量。sum 包含值的总和,count 包含值的计数。
首先,initAggregate()
会初始化变量并将其值设为零。
virtual void initAggregate(ServerInterface &srvInterface,
IntermediateAggs &aggs)
{
try {
VNumeric &sum = aggs.getNumericRef(0);
sum.setZero();
vint &count = aggs.getIntRef(1);
count = 0;
}
catch(std::exception &e) {
vt_ report_ error(0, "Exception while initializing intermediate aggregates: [% s]", e.what());
}
}
aggregate()
函数会读取每个节点上的数据块并计算部分聚合。
void aggregate(ServerInterface &srvInterface,
BlockReader &argReader,
IntermediateAggs &aggs)
{
try {
VNumeric &sum = aggs.getNumericRef(0);
vint &count = aggs.getIntRef(1);
do {
const VNumeric &input = argReader.getNumericRef(0);
if (!input.isNull()) {
sum.accumulate(&input);
count++;
}
} while (argReader.next());
} catch(std::exception &e) {
vt_ report_ error(0, " Exception while processing aggregate: [% s]", e.what());
}
}
aggregate()
函数的每个已完成实例都会返回 sum 和 count 的多个部分聚合。下图使用 aggregate()
函数说明了此过程:
combine()
函数会将 average 函数的每个实例计算出的部分聚合组合在一起。
virtual void combine(ServerInterface &srvInterface,
IntermediateAggs &aggs,
MultipleIntermediateAggs &aggsOther)
{
try {
VNumeric &mySum = aggs.getNumericRef(0);
vint &myCount = aggs.getIntRef(1);
// Combine all the other intermediate aggregates
do {
const VNumeric &otherSum = aggsOther.getNumericRef(0);
const vint &otherCount = aggsOther.getIntRef(1);
// Do the actual accumulation
mySum.accumulate(&otherSum);
myCount += otherCount;
} while (aggsOther.next());
} catch(std::exception &e) {
// Standard exception. Quit.
vt_report_error(0, "Exception while combining intermediate aggregates: [%s]", e.what());
}
}
下图显示了如何组合每个部分聚合:
在 aggregate()
函数评估完所有输入之后,Vertica 会调用 terminate()
函数。该函数会向调用者返回平均值。
virtual void terminate(ServerInterface &srvInterface,
BlockWriter &resWriter,
IntermediateAggs &aggs)
{
try {
const int32 MAX_INT_PRECISION = 20;
const int32 prec = Basics::getNumericWordCount(MAX_INT_PRECISION);
uint64 words[prec];
VNumeric count(words,prec,0/*scale*/);
count.copy(aggs.getIntRef(1));
VNumeric &out = resWriter.getNumericRef();
if (count.isZero()) {
out.setNull();
} else
const VNumeric &sum = aggs.getNumericRef(0);
out.div(&sum, &count);
}
}
下图显示了 terminate()
函数的实施过程:
AggregateFunctionFactory 实施
使用 getPrototype()
函数,可以定义要发送到聚合函数的变量以及在聚合函数运行后返回给 Vertica 的变量。以下示例接受并返回了数值:
virtual void getPrototype(ServerInterface &srvfloaterface,
ColumnTypes &argTypes,
ColumnTypes &returnType)
{
argTypes.addNumeric();
returnType.addNumeric();
}
getIntermediateTypes()
函数可定义在聚合函数中使用的任何中间变量。中间变量是用于在多次调用聚合函数期间传递数据的值。它们用于组合结果,直到可以计算出最终结果。在此示例中,存在两个结果 - 总计(数字)和计数(整数)。
virtual void getIntermediateTypes(ServerInterface &srvInterface,
const SizedColumnTypes &inputTypes,
SizedColumnTypes &intermediateTypeMetaData)
{
const VerticaType &inType = inputTypes.getColumnType(0);
intermediateTypeMetaData.addNumeric(interPrec, inType.getNumericScale());
intermediateTypeMetaData.addInt();
}
getReturnType()
函数用于定义输出数据类型:
virtual void getReturnType(ServerInterface &srvfloaterface,
const SizedColumnTypes &inputTypes,
SizedColumnTypes &outputTypes)
{
const VerticaType &inType = inputTypes.getColumnType(0);
outputTypes.addNumeric(inType.getNumericPrecision(),
inType.getNumericScale());
}