这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

分析函数 (UDAnF)

用户定义的分析函数 (UDAnF) 用于分析。有关 Vertica 内置分析的概述,请参阅 SQL 分析。与用户定义的标量函数 (UDSF) 一样,UDAnF 也必须为读取的每个数据行输出单个值,并且不能超过 9800 个实参。

与 UDSF 不同的是,UDAnF 的输入读取器和输出读取器可以单独前进。使用此功能,可以创建基于多个数据行计算输出值的分析功能。通过使读取器和写入器单独前进,可以创建与内置的分析函数(例如 LAG,此函数使用前面的行中的数据来输出当前行的值)相似的函数。

1 - AnalyticFunction 类

AnalyticFunction 类可执行分析处理。子类必须定义用于执行操作的 processPartition() 方法。该类可定义多种方法以设置和分解该函数。

执行操作

processPartition() 方法可读取数据分区,执行某种处理,以及为每个输入行输出单个值。

Vertica 将为每个数据分区调用 processPartition() 一次。它使用 AnalyticPartitionReader 对象来提供分区,您可以从该对象读取输入数据。此外,该对象上存在名为 isNewOrderByKey() 的唯一方法,此方法可返回布尔值以指示函数是否已发现具有一个或多个相同 ORDER BY 键的行。此方法对分析函数(例如示例 RANK 函数)很有用,分析函数需要以不同方式处理具有相同 ORDER BY 键的行和具有不同 ORDER BY 键的行。

方法已完成处理数据行之后,您可以通过对 AnalyticPartitionReader 调用 next() 使其前进到下一个输入行。

方法使用 Vertica 作为参数提供给 processPartition()AnalyticPartitionWriter 对象来写入输出值。此对象具有特定于数据类型的方法(例如 setInt())用于写入输出值。设置输出值之后,对 AnalyticPartitionWriter 调用 next() 以前进到下一个输出行。

设置和分解

AnalyticFunction 类定义了两种其他方法,您可以选择性地实施这两种方法以分配和释放资源: setup()destroy()。您应使用这些方法来分配和取消分配那些不通过 UDx API 分配的资源(有关详细信息,请参阅为 UDx 分配资源)。

API

AnalyticFunction API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface,
        const SizedColumnTypes &argTypes);

virtual void processPartition (ServerInterface &srvInterface,
        AnalyticPartitionReader &input_reader,
        AnalyticPartitionWriter &output_writer)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface, const SizedColumnTypes &argTypes);

AnalyticFunction API 提供了以下通过子类扩展的方法:

public void setup(ServerInterface srvInterface, SizedColumnTypes argTypes);

public abstract void processPartition (ServerInterface srvInterface,
        AnalyticPartitionReader input_reader, AnalyticPartitionWriter output_writer)
        throws UdfException, DestroyInvocation;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface, SizedColumnTypes argTypes);

2 - AnalyticFunctionFactory 类

AnalyticFunctionFactory 类将向 Vertica 提供有关 UDAnF 的元数据:其参数数量和数据类型及其返回值的数据类型。该类还会实例化 AnalyticFunction 的子类。

AnalyticFunctionFactory 子类必须实施以下方法:

  • getPrototype() 可描述函数的输入参数和输出值。可以通过对传递到方法的两个 ColumnTypes 对象调用函数来设置这些值。

  • createAnalyticFunction() 可提供 AnalyticFunction 的实例,Vertica 可以调用此实例以处理 UDAnF 函数调用。

  • getReturnType() 可提供有关函数输出的详细信息。通过此方法,您可以设置输出值的宽度(如果函数将返回可变宽度值,例如 VARCHAR),或者设置输出值的精度(如果该输出值具有可设置的精度,例如 TIMESTAMP)。

API

AnalyticFunctionFactory API 提供了以下通过子类扩展的方法:

virtual AnalyticFunction * createAnalyticFunction (ServerInterface &srvInterface)=0;

virtual void getPrototype(ServerInterface &srvInterface,
        ColumnTypes &argTypes, ColumnTypes &returnType)=0;

virtual void getReturnType(ServerInterface &srvInterface,
        const SizedColumnTypes &argTypes, SizedColumnTypes &returnType)=0;

virtual void getParameterType(ServerInterface &srvInterface,
        SizedColumnTypes &parameterTypes);

AnalyticFunctionFactory API 提供了以下通过子类扩展的方法:

public abstract AnalyticFunction createAnalyticFunction (ServerInterface srvInterface);

public abstract void getPrototype(ServerInterface srvInterface, ColumnTypes argTypes, ColumnTypes returnType);

public abstract void getReturnType(ServerInterface srvInterface, SizedColumnTypes argTypes,
        SizedColumnTypes returnType) throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

3 - C++ 示例:排名

Rank 分析函数根据行的排序顺序对其进行排序。此 UDx 的 Java 版本包含在 /opt/vertica/sdk/examples 中。

加载和使用示例

以下示例显示了如何将函数加载至 Vertica 中。假设包含该函数的 AnalyticFunctions.so 库已复制到启动程序节点上数据库管理员用户的主目录中。

=> CREATE LIBRARY AnalyticFunctions AS '/home/dbadmin/AnalyticFunctions.so';
CREATE LIBRARY
=> CREATE ANALYTIC FUNCTION an_rank AS LANGUAGE 'C++'
   NAME 'RankFactory' LIBRARY AnalyticFunctions;
CREATE ANALYTIC FUNCTION

以下是使用此 rank 函数(名为 an_rank)的示例:

=> SELECT * FROM hits;
      site       |    date    | num_hits
-----------------+------------+----------
 www.example.com | 2012-01-02 |       97
 www.vertica.com | 2012-01-01 |   343435
 www.example.com | 2012-01-01 |      123
 www.example.com | 2012-01-04 |      112
 www.vertica.com | 2012-01-02 |   503695
 www.vertica.com | 2012-01-03 |   490387
 www.example.com | 2012-01-03 |      123
(7 rows)
=> SELECT site,date,num_hits,an_rank()
   OVER (PARTITION BY site ORDER BY num_hits DESC)
   AS an_rank FROM hits;
      site       |    date    | num_hits | an_rank
-----------------+------------+----------+---------
 www.example.com | 2012-01-03 |      123 |       1
 www.example.com | 2012-01-01 |      123 |       1
 www.example.com | 2012-01-04 |      112 |       3
 www.example.com | 2012-01-02 |       97 |       4
 www.vertica.com | 2012-01-02 |   503695 |       1
 www.vertica.com | 2012-01-03 |   490387 |       2
 www.vertica.com | 2012-01-01 |   343435 |       3
(7 rows)

与内置的 RANK 分析函数一样,在 ORDER BY 列(此示例中的 num_hits)具有相同值的行具有相同排名,但排名会持续增加,以便下一个具有不同 ORDER BY 键的行可基于其前面的行数获得排名值。

AnalyticFunction 实施

以下代码定义了一个名为 RankAnalyticFunction 子类。该子类基于 SDK 示例目录中分发的示例代码。

/**
 * User-defined analytic function: Rank - works mostly the same as SQL-99 rank
 * with the ability to define as many order by columns as desired
 *
 */
class Rank : public AnalyticFunction
{
    virtual void processPartition(ServerInterface &srvInterface,
                                  AnalyticPartitionReader &inputReader,
                                  AnalyticPartitionWriter &outputWriter)
    {
        // Always use a top-level try-catch block to prevent exceptions from
        // leaking back to Vertica or the fenced-mode side process.
        try {
            rank = 1; // The rank to assign a row
            rowCount = 0; // Number of rows processed so far
            do {
                rowCount++;
                // Do we have a new order by row?
                if (inputReader.isNewOrderByKey()) {
                    // Yes, so set rank to the total number of rows that have been
                    // processed. Otherwise, the rank remains the same value as
                    // the previous iteration.
                    rank = rowCount;
                }
                // Write the rank
                outputWriter.setInt(0, rank);
                // Move to the next row of the output
                outputWriter.next();
            } while (inputReader.next()); // Loop until no more input
        } catch(exception& e) {
            // Standard exception. Quit.
            vt_report_error(0, "Exception while processing partition: %s", e.what());
        }
    }
private:
    vint rank, rowCount;
};

在此示例中,processPartition() 方法实际上不读取输入行中的任何数据;只会遍历这些行。该方法不需要读取数据;它只需要计算已读取的行数并确定这些行是否具有与上一行相同的 ORDER BY 键。如果当前行为新的 ORDER BY 键,则排名设置为已处理的总行数。如果当前行与上一行的 ORDER BY 值相同,则排名保持不变。

请注意,此函数包含顶级 try-catch 块。所有 UDx 函数都应始终包含该块,以防止偶然发生的异常传递回 Vertica(如果在非隔离模式下运行函数)或从属进程。

AnalyticFunctionFactory 实施

以下代码定义了与 Rank 分析函数对应的 AnalyticFunctionFactory

class RankFactory : public AnalyticFunctionFactory
{
    virtual void getPrototype(ServerInterface &srvInterface,
                                ColumnTypes &argTypes, ColumnTypes &returnType)
    {
        returnType.addInt();
    }
    virtual void getReturnType(ServerInterface &srvInterface,
                               const SizedColumnTypes &inputTypes,
                               SizedColumnTypes &outputTypes)
    {
        outputTypes.addInt();
    }
    virtual AnalyticFunction *createAnalyticFunction(ServerInterface
                                                        &srvInterface)
    { return vt_createFuncObj(srvInterface.allocator, Rank); }
};

RankFactory 子类定义的第一种方法 getPrototype() 设置了返回值的数据类型。因为 Rank UDAnF 不读取输入内容,因此不会通过对传入 argTypes 参数的 ColumnTypes 对象调用方法定义任何实参。

下一种方法是 getReturnType()。如果函数返回需要定义宽度或精度的数据类型,则 getReturnType() 方法的实施将对作为参数传入的 SizedColumnType 对象调用某个方法,以向 Vertica 说明该宽度或精度。 Rank 将返回固定宽度的数据类型 (INTEGER),因此无需设置其输出的精度或宽度;它只是调用 addInt() 以报告其输出数据类型而已。

最后,RankFactory 定义了 createAnalyticFunction() 方法,该方法会返回一个 Vertica 可以调用的 AnalyticFunction 类的实例。此代码大部分是样板。您只需在对 vt_createFuncObj() 发出的调用中添加分析函数类的名称即可,此子类将为您分配对象。