C++ 示例:PolyNthValue
PolyNthValue 示例是一个分析函数,它返回其输入中每个分区的第 N 行中的值。该函数是 FIRST_VALUE [analytic] 和 LAST_VALUE [analytic] 的泛化。
这些值可以是任何基元数据类型。
有关完整的源代码,请参阅示例(位于 /opt/vertica/sdk/examples/AnalyticFunctions/
中)中的 PolymorphicNthValue.cpp
。
加载和使用示例
加载库并创建函数,如下所示:
=> CREATE LIBRARY AnalyticFunctions AS '/home/dbadmin/AnalyticFns.so';
CREATE LIBRARY
=> CREATE ANALYTIC FUNCTION poly_nth_value AS LANGUAGE 'C++'
NAME 'PolyNthValueFactory' LIBRARY AnalyticFunctions;
CREATE ANALYTIC FUNCTION
考虑不同测试组的分数表:
=> SELECT cohort, score FROM trials;
cohort | score
--------+-------
1 | 9
1 | 8
1 | 7
3 | 3
3 | 2
3 | 1
2 | 4
2 | 5
2 | 6
(9 rows)
在使用 OVER 子句对数据进行分区的查询中调用该函数。此示例返回每个同类群组中的第二高分:
=> SELECT cohort, score, poly_nth_value(score USING PARAMETERS n=2) OVER (PARTITION BY cohort) AS nth_value
FROM trials;
cohort | score | nth_value
--------+-------+-----------
1 | 9 | 8
1 | 8 | 8
1 | 7 | 8
3 | 3 | 2
3 | 2 | 2
3 | 1 | 2
2 | 4 | 5
2 | 5 | 5
2 | 6 | 5
(9 rows)
工厂实施
工厂先声明类是多态的,然后根据输入类型设置返回类型。两个工厂方法指定实参和返回类型。
使用 getPrototype()
方法声明分析函数接受并返回任何类型:
void getPrototype(ServerInterface &srvInterface, ColumnTypes &argTypes, ColumnTypes &returnType)
{
// This function supports any argument data type
argTypes.addAny();
// Output data type will be the same as the argument data type
// We will specify that in getReturnType()
returnType.addAny();
}
在运行时调用 getReturnType()
方法。这是您根据输入类型设置返回类型的地方:
void getReturnType(ServerInterface &srvInterface, const SizedColumnTypes &inputTypes,
SizedColumnTypes &outputTypes)
{
// This function accepts only one argument
// Complain if we find a different number
std::vector<size_t> argCols;
inputTypes.getArgumentColumns(argCols); // get argument column indices
if (argCols.size() != 1)
{
vt_report_error(0, "Only one argument is expected but %s provided",
argCols.size()? std::to_string(argCols.size()).c_str() : "none");
}
// Define output type the same as argument type
outputTypes.addArg(inputTypes.getColumnType(argCols[0]), inputTypes.getColumnName(argCols[0]));
}
函数实施
分析函数本身与类型无关:
void processPartition(ServerInterface &srvInterface, AnalyticPartitionReader &inputReader,
AnalyticPartitionWriter &outputWriter)
{
try {
const SizedColumnTypes &inTypes = inputReader.getTypeMetaData();
std::vector<size_t> argCols; // Argument column indexes.
inTypes.getArgumentColumns(argCols);
vint currentRow = 1;
bool nthRowExists = false;
// Find the value of the n-th row
do {
if (currentRow == this->n) {
nthRowExists = true;
break;
} else {
currentRow++;
}
} while (inputReader.next());
if (nthRowExists) {
do {
// Return n-th value
outputWriter.copyFromInput(0 /*dest column*/, inputReader,
argCols[0] /*source column*/);
} while (outputWriter.next());
} else {
// The partition has less than n rows
// Return NULL value
do {
outputWriter.setNull(0);
} while (outputWriter.next());
}
} catch(std::exception& e) {
// Standard exception. Quit.
vt_report_error(0, "Exception while processing partition: [%s]", e.what());
}
}
};