这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

标量函数 (UDSF)

用户定义的标量函数 (UDSF) 为读取的每个数据行返回单个值。您可以在任何可使用内置 Vertica 函数的地方使用 UDSF。您通常可以开发 UDSF 来执行在使用 SQL 语句和函数时因过于复杂或速度过慢而难以执行的数据处理。UDSF 还可让您在 Vertica 中使用由第三方库提供的分析函数,并同时保持高性能。

UDSF 返回单个列。您可以在 ROW 中自动返回多个值。ROW 是一组属性值对。在以下示例中,div_with_rem 是一个执行除法运算的 UDSF,将商和余数作为整数返回:

=> SELECT div_with_rem(18,5);
        div_with_rem
------------------------------
 {"quotient":3,"remainder":3}
(1 row)

从 UDSF 返回的 ROW 不能用作 COUNT 的实参。

或者,您可以自己构造一个复杂的返回值,如复杂类型作为实参中所述。

UDSF 必须为每个输入行返回值(除非生成错误;有关详细信息,请参阅处理错误)。未能为输入行返回值会导致不正确的结果,如果未在 隔离和非隔离模式 中运行,可能会破坏 Vertica 服务器的稳定性。

一个 UDSF 最多可以有 9800 个实参。

1 - ScalarFunction 类

ScalarFunction 类是 UDSF 的核心。子类必须定义用于执行标量操作的 processBlock() 方法。该类可定义多种方法以设置和分解该函数。

对于使用 C++ 语言编写的标量函数,您可以提供有助于优化查询的信息。请参阅提高查询性能(仅限 C++)

执行操作

processBlock() 方法可执行您希望 UDSF 执行的所有处理。当用户在 SQL 语句中调用您的函数时,Vertica 会将来自函数参数的数据绑定在一起并将其传递至 processBlock()

processBlock() 方法的输入和输出由 BlockReaderBlockWriter 类的对象提供。这些类定义了用于为 UDSF 读取输入数据和写入输出数据的方法。

开发 UDSF 的大部分工作是编写 processBlock()。对函数的所有处理在此阶段进行。UDSF 应遵循以下基本模式:

  • 使用特定于数据类型的方法从 BlockReader 对象读入一组实参。

  • 以某种方式处理数据。

  • 使用 BlockWriter 类的特定于数据类型的方法之一输出结果值。

  • 通过调用 BlockWriter.next()BlockReader.next() 分别前进到下一个输出行和下一个输入行。

此过程将持续到没有更多数据行可供读取为止(BlockReader.next() 将返回 false)。

您必须确保 processBlock() 读取所有输入行并为每个行输出单个值。如未遵守此规则,将损坏 Vertica 为获取 UDSF 的输出而读取的数据结构。此规则的唯一例外出现在 processBlock() 函数将错误报告回 Vertica 时(请参阅处理错误)。在这种情况下,Vertica 不会尝试读取由 UDSF 生成的不完整结果集。

设置和分解

ScalarFunction 类定义了两种其他方法,您可以选择性地实施这两种方法以分配和释放资源: setup()destroy()。您应使用这些方法来分配和取消分配那些不通过 UDx API 分配的资源(有关详细信息,请参阅为 UDx 分配资源)。

注意

  • 虽然为 ScalarFunction 子类选择的名称不必与稍后为其分配的 SQL 函数名称匹配,但 Vertica 会将名称设为相同视为最佳实践。

  • 请勿假设系统会从将函数实例化的同一个线程调用该函数。

  • 可以调用 ScalarFunction 子类的同一个实例以处理多个数据块。

  • 不保证发送到 processBlock() 的输入行具有特定顺序。

  • 写入太多输出行会导致 Vertica 发出超过边界错误。

API

ScalarFunction API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface,
        const SizedColumnTypes &argTypes);

virtual void processBlock(ServerInterface &srvInterface,
        BlockReader &arg_reader, BlockWriter &res_writer)=0;

virtual void getOutputRange (ServerInterface &srvInterface,
        ValueRangeReader &inRange, ValueRangeWriter &outRange)

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface, const SizedColumnTypes &argTypes);

ScalarFunction API 提供了以下通过子类扩展的方法:

public void setup(ServerInterface srvInterface, SizedColumnTypes argTypes);

public abstract void processBlock(ServerInterface srvInterface, BlockReader arg_reader,
        BlockWriter res_writer) throws UdfException, DestroyInvocation;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface, SizedColumnTypes argTypes);

ScalarFunction API 提供了以下通过子类扩展的方法:


def setup(self, server_interface, col_types)

def processBlock(self, server_interface, block_reader, block_writer)

def destroy(self, server_interface, col_types)

实施 主函数 API 以定义标量函数:

FunctionName <- function(input.data.frame, parameters.data.frame) {
  # Computations

  # The function must return a data frame.
  return(output.data.frame)
}

2 - ScalarFunctionFactory 类

ScalarFunctionFactory 类将向 Vertica 提供有关 UDSF 的元数据:其参数数量和数据类型及其返回值的数据类型。该类还会实例化 ScalarFunction 的子类。

方法

您必须在 ScalarFunctionFactory 子类中实施以下方法:

  • createScalarFunction() 实例化 ScalarFunction 子类。如果采用 C++ 进行编写,则您可以使用 ScalarFunction 子类的名称调用 vt_createFuncObj 宏。此宏会为您分配类并将该类实例化。

  • getPrototype() 会向 Vertica 提供有关 UDSF 的参数和返回类型。除了一个 ServerInterface 对象之外,此方法还获取另外两个 ColumnTypes 对象。在此函数中唯一需要执行的操作是,对这两个对象调用类函数以构建参数列表和返回值类型。如果返回多个值,则将结果打包为 ROW 类型。

定义工厂类之后,您需要调用 RegisterFactory 宏。此宏可将工厂类的成员实例化,以便 Vertica 可以与该成员交互并提取其中包含的有关 UDSF 的元数据。

声明返回值

如果函数返回特定大小的列(一种长度可变的返回数据类型,例如 VARCHAR)、需要精度的值或多个值,则必须实施 getReturnType()。此方法由 Vertica 调用,以用于查找在每个结果行中返回的数据的长度和精度。此方法的返回值取决于 processBlock() 方法所返回的数据类型:

  • CHAR、(LONG) VARCHAR、BINARY 和 (LONG) VARBINARY 将返回最大长度。

  • NUMERIC 类型可指定精度和小数位数。

  • TIME 和 TIMESTAMP 值可指定精度(无论是否带有时区均可)。

  • INTERVAL YEAR TO MONTH 可指定范围。

  • INTERVAL DAY TO SECOND 可指定精度和范围。

  • ARRAY 类型将指定元素的最大数量。

如果 UDSF 不返回以上数据类型之一,而返回单个值,则它不需要实施 getReturnType() 方法。

传递到 getReturnType() 方法的输入是一个 SizedColumnTypes 对象,其中包含输入参数类型及其长度。此对象将传递到 processBlock() 函数的一个实例。getReturnType() 的实施必须从该输入提取数据类型和长度,并确定输出行的长度或精度,然后将此信息保存在 SizedColumnTypes 类的另一个实例中。

API

ScalarFunctionFactory API 提供了以下通过子类扩展的方法:

virtual ScalarFunction * createScalarFunction(ServerInterface &srvInterface)=0;

virtual void getPrototype(ServerInterface &srvInterface,
        ColumnTypes &argTypes, ColumnTypes &returnType)=0;

virtual void getReturnType(ServerInterface &srvInterface,
        const SizedColumnTypes &argTypes, SizedColumnTypes &returnType);

virtual void getParameterType(ServerInterface &srvInterface,
        SizedColumnTypes &parameterTypes);

ScalarFunctionFactory API 提供了以下通过子类扩展的方法:

public abstract ScalarFunction createScalarFunction(ServerInterface srvInterface);

public abstract void getPrototype(ServerInterface srvInterface, ColumnTypes argTypes, ColumnTypes returnType);

public void getReturnType(ServerInterface srvInterface, SizedColumnTypes argTypes,
        SizedColumnTypes returnType) throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

ScalarFunctionFactory API 提供了以下通过子类扩展的方法:

def createScalarFunction(self, srv)

def getPrototype(self, srv_interface, arg_types, return_type)

def getReturnType(self, srv_interface, arg_types, return_type)

实施 工厂函数 API 以定义标量函数工厂:

FunctionNameFactory <- function() {
  list(name    = FunctionName,
       udxtype = c("scalar"),
       intype  = c("int"),
       outtype = c("int"))
}

3 - 设置 null 输入和波动性行为

一般情况下,Vertica 会为查询中的每个数据行调用 UDSF。在某些情况下,Vertica 可以避免执行您的 UDSF。您可以告知 Vertica 何时能够跳过函数调用,而只需通过更改函数的波动性和严格性设置来提供返回值本身。

  • 函数的波动性 是指在传递相同的参数时,它是否始终返回相同的输出值。根据其行为,Vertica 可以缓存实参和返回值。如果用户使用相同的实参集来调用 UDSF,Vertica 会返回缓存值,而不会调用 UDSF。

  • 函数的严格性 是指它如何对 NULL 实参作出响应。如果“任何”实参为 NULL 时它始终返回 NULL,则 Vertica 可以只返回 NULL,而无需调用函数。此优化还可以节省工作量,因为您无需在 UDSF 代码中测试并处理 Null 参数。

可以通过在 ScalarFunctionFactory 类的构造函数中设置 volstrict 字段来指示函数的可变性和空值处理。

可变性设置

要指示函数的可变性,请将 vol 字段设置为以下值之一:

示例

以下示例显示了使函数不可变的 Add2ints 示例工厂类的某个版本。

class Add2intsImmutableFactory : public Vertica::ScalarFunctionFactory
{
    virtual Vertica::ScalarFunction *createScalarFunction(Vertica::ServerInterface &srvInterface)
    { return vt_createFuncObj(srvInterface.allocator, Add2ints); }
    virtual void getPrototype(Vertica::ServerInterface &srvInterface,
                              Vertica::ColumnTypes &argTypes,
                              Vertica::ColumnTypes &returnType)
    {
        argTypes.addInt();
        argTypes.addInt();
        returnType.addInt();
    }

public:
    Add2intsImmutableFactory() {vol = IMMUTABLE;}
};
RegisterFactory(Add2intsImmutableFactory);

以下示例演示了将 Add2IntsFactoryvol 字段设置为 IMMUTABLE 以向 Vertica 说明可以缓存实参和返回值。

public class Add2IntsFactory extends ScalarFunctionFactory {

    @Override
    public void getPrototype(ServerInterface srvInterface, ColumnTypes argTypes, ColumnTypes returnType){
        argTypes.addInt();
        argTypes.addInt();
        returnType.addInt();
    }

    @Override
    public ScalarFunction createScalarFunction(ServerInterface srvInterface){
        return new Add2Ints();
    }

    // Class constructor
    public Add2IntsFactory() {
        // Tell Vertica that the same set of arguments will always result in the
        // same return value.
        vol = volatility.IMMUTABLE;
    }
}

Null 输入行为

要指示函数如何响应 Null 输入,请将 strictness 字段设置为以下值之一:

示例

以下 C++ 示例演示了设置 Add2ints 的 null 行为以使 Vertica 不调用带有 NULL 值的函数。

class Add2intsNullOnNullInputFactory : public Vertica::ScalarFunctionFactory
{
    virtual Vertica::ScalarFunction *createScalarFunction(Vertica::ServerInterface &srvInterface)
    { return vt_createFuncObj(srvInterface.allocator, Add2ints); }
    virtual void getPrototype(Vertica::ServerInterface &srvInterface,
                              Vertica::ColumnTypes &argTypes,
                              Vertica::ColumnTypes &returnType)
    {
        argTypes.addInt();
        argTypes.addInt();
        returnType.addInt();
    }

public:
    Add2intsNullOnNullInputFactory() {strict = RETURN_NULL_ON_NULL_INPUT;}
};
RegisterFactory(Add2intsNullOnNullInputFactory);

4 - 提高查询性能(仅限 C++)

在评估查询时,Vertica 可以利用有关值范围的可用信息。例如,如果数据已分区并且查询通过分区值限制输出,则 Vertica 可以忽略不可能包含满足查询的数据的分区。同样,对于标量函数,Vertica 可以不处理数据中函数返回的值不可能影响结果的行。

考虑一个表(表中包含数百万行客户订单数据)和一个标量函数(该函数计算为订单中的所有项目支付的总价格)。查询使用 WHERE 子句将结果限制为高于给定值的订单。对数据块调用标量函数;如果该块中没有行可以产生目标值,则跳过该块的处理可以提高查询性能。

用 C++ 编写的标量函数可以实施 getOutputRange 方法。在调用 processBlock 之前,Vertica 调用 getOutputRange 以在给定输入范围的情况下确定此块的最小和最大返回值。然后,它决定是否调用 processBlock 来执行计算。

Add2Ints 示例实施了此函数。最小输出值是两个输入的最小值之和,最大输出值是每个输入的最大值之和。此函数不考虑单个行。考虑以下输入:

   a  |  b
------+------
  21  | 92
 500  | 19
 111  | 11

两个输入的最小值是 21 和 11,因此函数将 32 报告为输出范围的下限。最大输入值为 500 和 92,因此它将 592 报告为输出范围的上限。任何输入行的返回值均大于 32,小于 592。

getOutputRange 的目的是快速消除输出肯定超出范围的调用。例如,如果查询包含 "WHERE Add2Ints(a,b) > 600",则可以跳过该数据块。仍然可能存在调用 getOutputRangeprocessBlock 没有返回结果的情况。如果查询包含 "WHERE Add2Ints(a,b) > 500",则 getOutputRange 不会消除此数据块。

Add2Ints 实施 getOutputRange,如下所示:


    /*
     * This method computes the output range for this scalar function from
     *   the ranges of its inputs in a single invocation.
     *
     * The input ranges are retrieved via inRange
     * The output range is returned via outRange
     */
    virtual void getOutputRange(Vertica::ServerInterface &srvInterface,
                                Vertica::ValueRangeReader &inRange,
                                Vertica::ValueRangeWriter &outRange)
    {
        if (inRange.hasBounds(0) && inRange.hasBounds(1)) {
            // Input ranges have bounds defined
            if (inRange.isNull(0) || inRange.isNull(1)) {
                // At least one range has only NULL values.
                // Output range can only have NULL values.
                outRange.setNull();
                outRange.setHasBounds();
                return;
            } else {
                // Compute output range
                const vint& a1LoBound = inRange.getIntRefLo(0);
                const vint& a2LoBound = inRange.getIntRefLo(1);
                outRange.setIntLo(a1LoBound + a2LoBound);

                const vint& a1UpBound = inRange.getIntRefUp(0);
                const vint& a2UpBound = inRange.getIntRefUp(1);
                outRange.setIntUp(a1UpBound + a2UpBound);
            }
        } else {
            // Input ranges are unbounded. No output range can be defined
            return;
        }

        if (!inRange.canHaveNulls(0) && !inRange.canHaveNulls(1)) {
            // There cannot be NULL values in the output range
            outRange.setCanHaveNulls(false);
        }

        // Let Vertica know that the output range is bounded
        outRange.setHasBounds();
    }

如果 getOutputRange 产生错误,Vertica 会发出警告并且不会为当前查询再次调用该方法。

5 - C++ 示例:Add2Ints

以下示例显示了基本的 ScalarFunction 子类,它名为 Add2ints。顾名思义,它将两个整数相加,并返回单个整数结果。

有关完整的源代码,请参阅/opt/vertica/sdk/examples/ScalarFunctions/Add2Ints.cpp。此 UDx 的 Java 和 Python 版本包含在 /opt/vertica/sdk/examples 中。

加载和使用示例

使用 CREATE LIBRARY 加载包含函数的库,然后按下例所示使用 CREATE FUNCTION(标量) 声明该函数:

=> CREATE LIBRARY ScalarFunctions AS '/home/dbadmin/examples/ScalarFunctions.so';

=> CREATE FUNCTION add2ints AS LANGUAGE 'C++' NAME 'Add2IntsFactory' LIBRARY ScalarFunctions;

以下示例显示了如何使用该函数:

=> SELECT Add2Ints(27,15);
 Add2ints
----------
       42
(1 row)
=> SELECT * FROM MyTable;
  a  | b
-----+----
   7 |  0
  12 |  2
  12 |  6
  18 |  9
   1 |  1
  58 |  4
 450 | 15
(7 rows)
=> SELECT * FROM MyTable WHERE Add2ints(a, b) > 20;
  a  | b
-----+----
  18 |  9
  58 |  4
 450 | 15
(3 rows)

函数实施

标量函数在 processBlock 方法中进行计算:


class Add2Ints : public ScalarFunction
 {
    public:
   /*
     * This method processes a block of rows in a single invocation.
     *
     * The inputs are retrieved via argReader
     * The outputs are returned via resWriter
     */
    virtual void processBlock(ServerInterface &srvInterface,
                              BlockReader &argReader,
                              BlockWriter &resWriter)
    {
        try {
            // While we have inputs to process
            do {
                if (argReader.isNull(0) || argReader.isNull(1)) {
                    resWriter.setNull();
                } else {
                    const vint a = argReader.getIntRef(0);
                    const vint b = argReader.getIntRef(1);
                    resWriter.setInt(a+b);
                }
                resWriter.next();
            } while (argReader.next());
        } catch(std::exception& e) {
            // Standard exception. Quit.
            vt_report_error(0, "Exception while processing block: [%s]", e.what());
        }
    }

  // ...
};

实施 getOutputRange,这是可选的,允许您的函数跳过结果不在目标范围内的行。例如,如果 WHERE 子句将查询结果限制在某个范围内,则无需为不可能在该范围内的情况调用该函数。


    /*
     * This method computes the output range for this scalar function from
     *   the ranges of its inputs in a single invocation.
     *
     * The input ranges are retrieved via inRange
     * The output range is returned via outRange
     */
    virtual void getOutputRange(Vertica::ServerInterface &srvInterface,
                                Vertica::ValueRangeReader &inRange,
                                Vertica::ValueRangeWriter &outRange)
    {
        if (inRange.hasBounds(0) && inRange.hasBounds(1)) {
            // Input ranges have bounds defined
            if (inRange.isNull(0) || inRange.isNull(1)) {
                // At least one range has only NULL values.
                // Output range can only have NULL values.
                outRange.setNull();
                outRange.setHasBounds();
                return;
            } else {
                // Compute output range
                const vint& a1LoBound = inRange.getIntRefLo(0);
                const vint& a2LoBound = inRange.getIntRefLo(1);
                outRange.setIntLo(a1LoBound + a2LoBound);

                const vint& a1UpBound = inRange.getIntRefUp(0);
                const vint& a2UpBound = inRange.getIntRefUp(1);
                outRange.setIntUp(a1UpBound + a2UpBound);
            }
        } else {
            // Input ranges are unbounded. No output range can be defined
            return;
        }

        if (!inRange.canHaveNulls(0) && !inRange.canHaveNulls(1)) {
            // There cannot be NULL values in the output range
            outRange.setCanHaveNulls(false);
        }

        // Let Vertica know that the output range is bounded
        outRange.setHasBounds();
    }

工厂实施

工厂实例化了类的一个成员 (createScalarFunction),并且还描述了函数的输入和输出 (getPrototype):

class Add2IntsFactory : public ScalarFunctionFactory
{
    // return an instance of Add2Ints to perform the actual addition.
    virtual ScalarFunction *createScalarFunction(ServerInterface &interface)
    { return vt_createFuncObject<Add2Ints>(interface.allocator); }

    // This function returns the description of the input and outputs of the
    // Add2Ints class's processBlock function.  It stores this information in
    // two ColumnTypes objects, one for the input parameters, and one for
    // the return value.
    virtual void getPrototype(ServerInterface &interface,
                              ColumnTypes &argTypes,
                              ColumnTypes &returnType)
    {
        argTypes.addInt();
        argTypes.addInt();

        // Note that ScalarFunctions *always* return a single value.
        returnType.addInt();
    }
};

RegisterFactory 宏

使用 RegisterFactory 宏注册一个 UDx。该宏将对工厂类进行实例化并将其包含的元数据供 Vertica 访问。要调用该宏,请将您的工厂类的名称传递给它。

RegisterFactory(Add2IntsFactory);

6 - Python 示例:currency_convert

currency_convert 标量函数从表中读取两个值,即币种和价值。然后它将项目的价值转换为美元,返回一个浮点结果。

您可以在 Vertica Github 存储库中找到更多 UDx 示例:https://github.com/vertica/UDx-Examples

UDSF Python 代码

import vertica_sdk
import decimal

rates2USD = {'USD': 1.000,
             'EUR': 0.89977,
             'GBP': 0.68452,
             'INR': 67.0345,
             'AUD': 1.39187,
             'CAD': 1.30335,
             'ZAR': 15.7181,
             'XXX': -1.0000}

class currency_convert(vertica_sdk.ScalarFunction):
    """Converts a money column to another currency

    Returns a value in USD.

    """
    def __init__(self):
        pass

    def setup(self, server_interface, col_types):
        pass

    def processBlock(self, server_interface, block_reader, block_writer):
        while(True):
            currency = block_reader.getString(0)
            try:
                rate = decimal.Decimal(rates2USD[currency])

            except KeyError:
                server_interface.log("ERROR: {} not in dictionary.".format(currency))
                # Scalar functions always need a value to move forward to the
                # next input row. Therefore, we need to assign it a value to
                # move beyond the error.
                currency = 'XXX'
                rate = decimal.Decimal(rates2USD[currency])

            starting_value = block_reader.getNumeric(1)
            converted_value = decimal.Decimal(starting_value  / rate)
            block_writer.setNumeric(converted_value)
            block_writer.next()
            if not block_reader.next():
                break

    def destroy(self, server_interface, col_types):
        pass

class currency_convert_factory(vertica_sdk.ScalarFunctionFactory):

    def createScalarFunction(self, srv):
        return currency_convert()

    def getPrototype(self, srv_interface, arg_types, return_type):
        arg_types.addVarchar()
        arg_types.addNumeric()
        return_type.addNumeric()

    def getReturnType(self, srv_interface, arg_types, return_type):
        return_type.addNumeric(9,4)

加载函数和库

创建库和函数。

=> CREATE LIBRARY pylib AS '/home/dbadmin/python_udx/currency_convert/currency_convert.py' LANGUAGE 'Python';
CREATE LIBRARY
=> CREATE FUNCTION currency_convert AS LANGUAGE 'Python' NAME 'currency_convert_factory' LIBRARY pylib fenced;
CREATE FUNCTION

使用函数查询数据

以下查询显示了如何使用 UDSF 运行查询。

=> SELECT product, currency_convert(currency, value) AS cost_in_usd
    FROM items;
   product    | cost_in_usd
--------------+-------------
 Shoes        |    133.4008
 Soccer Ball  |    110.2817
 Coffee       |     13.5190
 Surfboard    |    176.2593
 Hockey Stick |     76.7177
 Car          |  17000.0000
 Software     |     10.4424
 Hamburger    |      7.5000
 Fish         |    130.4272
 Cattle       |    269.2367
(10 rows)

7 - Python 示例:validate_url

validate_url 标量函数会从表中读取字符串,即 URL。然后,它会验证该 URL 是否会做出响应,以返回状态代码或指示尝试失败的字符串。

您可以在 Vertica Github 存储库中找到更多 UDx 示例:https://github.com/vertica/UDx-Examples

UDSF Python 代码


import vertica_sdk
import urllib.request
import time

class validate_url(vertica_sdk.ScalarFunction):
    """Validates HTTP requests.

    Returns the status code of a webpage. Pages that cannot be accessed return
    "Failed to load page."

    """

    def __init__(self):
        pass

    def setup(self, server_interface, col_types):
        pass

    def processBlock(self, server_interface, arg_reader, res_writer):
        # Writes a string to the UDx log file.
        server_interface.log("Validating webpage accessibility - UDx")

        while(True):
            url = arg_reader.getString(0)
            try:
                status = urllib.request.urlopen(url).getcode()
                # Avoid overwhelming web servers -- be nice.
                time.sleep(2)
            except (ValueError, urllib.error.HTTPError, urllib.error.URLError):
                status = 'Failed to load page'
            res_writer.setString(str(status))
            res_writer.next()
            if not arg_reader.next():
                # Stop processing when there are no more input rows.
                break

    def destroy(self, server_interface, col_types):
        pass

class validate_url_factory(vertica_sdk.ScalarFunctionFactory):

    def createScalarFunction(self, srv):
        return validate_url()

    def getPrototype(self, srv_interface, arg_types, return_type):
        arg_types.addVarchar()
        return_type.addChar()

    def getReturnType(self, srv_interface, arg_types, return_type):
        return_type.addChar(20)

加载函数和库

创建库和函数。

=> CREATE OR REPLACE LIBRARY pylib AS 'webpage_tester/validate_url.py' LANGUAGE 'Python';
=> CREATE OR REPLACE FUNCTION validate_url AS LANGUAGE 'Python' NAME 'validate_url_factory' LIBRARY pylib fenced;

使用函数查询数据

以下查询显示了如何使用 UDSF 运行查询。

=> SELECT url, validate_url(url) AS url_status FROM webpages;
                     url                       |      url_status
-----------------------------------------------+----------------------
 http://www.vertica.com/documentation/vertica/ | 200
 http://www.google.com/                        | 200
 http://www.mass.gov.com/                      | Failed to load page
 http://www.espn.com                           | 200
 http://blah.blah.blah.blah                    | Failed to load page
 http://www.microfocus.com/                    | 200
(6 rows)

8 - Python 示例:矩阵乘法

Python UDx 可以接受并返回复杂类型。MatrixMultiply 类会乘以输入矩阵,并返回生成的矩阵乘积。这些矩阵将以二维数组来表示。为了执行矩阵乘法运算,第一个输入矩阵中的列数必须等于第二个输入矩阵中的行数。

完整的源代码位于 /opt/vertica/sdk/examples/python/ScalarFunctions.py 中。

加载和使用示例

加载库并创建函数,如下所示:

=> CREATE OR REPLACE LIBRARY ScalarFunctions AS '/home/dbadmin/examples/python/ScalarFunctions.py' LANGUAGE 'Python';

=> CREATE FUNCTION MatrixMultiply AS LANGUAGE 'Python' NAME 'matrix_multiply_factory' LIBRARY ScalarFunctions;

您可以创建输入矩阵,然后调用诸如以下函数:


=> CREATE TABLE mn (id INTEGER, data ARRAY[ARRAY[INTEGER, 3], 2]);
CREATE TABLE

=> CREATE TABLE np (id INTEGER, data ARRAY[ARRAY[INTEGER, 2], 3]);
CREATE TABLE

=> COPY mn FROM STDIN PARSER fjsonparser();
{"id": 1, "data": [[1, 2, 3], [4, 5, 6]] }
{"id": 2, "data": [[7, 8, 9], [10, 11, 12]] }
\.

=> COPY np FROM STDIN PARSER fjsonparser();
{"id": 1, "data": [[0, 0], [0, 0], [0, 0]] }
{"id": 2, "data": [[1, 1], [1, 1], [1, 1]] }
{"id": 3, "data": [[2, 0], [0, 2], [2, 0]] }
\.

=> SELECT mn.id, np.id, MatrixMultiply(mn.data, np.data) FROM mn CROSS JOIN np ORDER BY 1, 2;
id | id |   MatrixMultiply
---+----+-------------------
1  |  1 | [[0,0],[0,0]]
1  |  2 | [[6,6],[15,15]]
1  |  3 | [[8,4],[20,10]]
2  |  1 | [[0,0],[0,0]]
2  |  2 | [[24,24],[33,33]]
2  |  3 | [[32,16],[44,22]]
(6 rows)

设置

所有 Python UDx 都必须导入 Vertica SDK 库:

import vertica_sdk

工厂实施

getPrototype() 方法会声明函数实参和返回类型都必须为二维数组,以整数数组的数组来表示:


def getPrototype(self, srv_interface, arg_types, return_type):
    array1dtype = vertica_sdk.ColumnTypes.makeArrayType(vertica_sdk.ColumnTypes.makeInt())
    arg_types.addArrayType(array1dtype)
    arg_types.addArrayType(array1dtype)
    return_type.addArrayType(array1dtype)

getReturnType() 验证乘积矩阵的行数是否与第一个输入矩阵相同,以及列数是否与第二个输入矩阵相同:


def getReturnType(self, srv_interface, arg_types, return_type):
    (_, a1type) = arg_types[0]
    (_, a2type) = arg_types[1]
    m = a1type.getArrayBound()
    p = a2type.getElementType().getArrayBound()
    return_type.addArrayType(vertica_sdk.SizedColumnTypes.makeArrayType(vertica_sdk.SizedColumnTypes.makeInt(), p), m)

函数实施

使用名称分别为 arg_readerres_writerBlockReaderBlockWriter 来调用 processBlock() 方法。为了访问输入数组的元素,该方法会使用 ArrayReader 实例。数组是嵌套的,因此必须为外部和内部数组实例化 ArrayReader。列表推导式简化了将输入数组读取到列表中的过程。该方法会执行计算,然后使用 ArrayWriter 实例来构造乘积矩阵。


def processBlock(self, server_interface, arg_reader, res_writer):
    while True:
        lmat = [[cell.getInt(0) for cell in row.getArrayReader(0)] for row in arg_reader.getArrayReader(0)]
        rmat = [[cell.getInt(0) for cell in row.getArrayReader(0)] for row in arg_reader.getArrayReader(1)]
        omat = [[0 for c in range(len(rmat[0]))] for r in range(len(lmat))]

        for i in range(len(lmat)):
            for j in range(len(rmat[0])):
                for k in range(len(rmat)):
                    omat[i][j] += lmat[i][k] * rmat[k][j]

        res_writer.setArray(omat)
        res_writer.next()

        if not arg_reader.next():
            break

9 - R 示例: SalesTaxCalculator

SalesTaxCalculator 标量函数会从表中读取浮点数和可变长字符串,即商品的价格和州名缩写。然后,它会使用州名缩写从列表中查找销售税率并计算商品的价格(包括所在州的销售税),以返回商品的总成本。

您可以在 Vertica Github 存储库中找到更多 UDx 示例:https://github.com/vertica/UDx-Examples

加载函数和库

创建库和函数。

=> CREATE OR REPLACE LIBRARY rLib AS 'sales_tax_calculator.R' LANGUAGE 'R';
CREATE LIBRARY
=> CREATE OR REPLACE FUNCTION SalesTaxCalculator AS LANGUAGE 'R' NAME 'SalesTaxCalculatorFactory' LIBRARY rLib FENCED;
CREATE FUNCTION

使用函数查询数据

以下查询显示了如何使用 UDSF 运行查询。

=> SELECT item, state_abbreviation,
          price, SalesTaxCalculator(price, state_abbreviation) AS Price_With_Sales_Tax
    FROM inventory;
    item     | state_abbreviation | price | Price_With_Sales_Tax
-------------+--------------------+-------+---------------------
 Scarf       | AZ                 |  6.88 |             7.53016
 Software    | MA                 | 88.31 |           96.655295
 Soccer Ball | MS                 | 12.55 |           13.735975
 Beads       | LA                 |  0.99 |            1.083555
 Baseball    | TN                 | 42.42 |            46.42869
 Cheese      | WI                 | 20.77 |           22.732765
 Coffee Mug  | MA                 |  8.99 |            9.839555
 Shoes       | TN                 | 23.99 |           26.257055
(8 rows)

UDSF R 代码


SalesTaxCalculator <- function(input.data.frame) {
  # Not a complete list of states in the USA, but enough to get the idea.
  state.sales.tax <- list(ma = 0.0625,
                          az = 0.087,
                          la = 0.0891,
                          tn = 0.0945,
                          wi = 0.0543,
                          ms = 0.0707)
  for ( state_abbreviation in input.data.frame[, 2] ) {
    # Ensure state abbreviations are lowercase.
    lower_state <- tolower(state_abbreviation)
    # Check if the state is in our state.sales.tax list.
    if (is.null(state.sales.tax[[lower_state]])) {
      stop("State is not in our small sample!")
    } else {
      sales.tax.rate <- state.sales.tax[[lower_state]]
      item.price <- input.data.frame[, 1]
      # Calculate the price including sales tax.
      price.with.sales.tax <- (item.price) + (item.price * sales.tax.rate)
    }
  }
  return(price.with.sales.tax)
}

SalesTaxCalculatorFactory <- function() {
  list(name    = SalesTaxCalculator,
       udxtype = c("scalar"),
       intype  = c("float", "varchar"),
       outtype = c("float"))
}

10 - R 示例:kmeans

KMeans_User 标量函数会从表中读取任意数量的列,即观察值。然后,在将 kmeans 群集算法应用于数据时,它会使用观测值和两个参数,以返回与行的群集相关联的整数值。

您可以在 Vertica Github 存储库中找到更多 UDx 示例:https://github.com/vertica/UDx-Examples

加载函数和库

创建库和函数:

=> CREATE OR REPLACE LIBRARY rLib AS 'kmeans.R' LANGUAGE 'R';
CREATE LIBRARY
=> CREATE OR REPLACE FUNCTION KMeans_User AS LANGUAGE 'R' NAME 'KMeans_UserFactory' LIBRARY rLib FENCED;
CREATE FUNCTION

使用函数查询数据

以下查询显示了如何使用 UDSF 运行查询。

=> SELECT spec,
          KMeans_User(sl, sw, pl, pw USING PARAMETERS clusters = 3, nstart = 20)
    FROM iris;
      spec       | KMeans_User
-----------------+-------------
 Iris-setosa     |           2
 Iris-setosa     |           2
 Iris-setosa     |           2
 Iris-setosa     |           2
 Iris-setosa     |           2
 Iris-setosa     |           2
 Iris-setosa     |           2
 Iris-setosa     |           2
 Iris-setosa     |           2
 Iris-setosa     |           2
 Iris-setosa     |           2
.
.
.
(150 rows)

UDSF R 代码


KMeans_User <- function(input.data.frame, parameters.data.frame) {
  # Take the clusters and nstart parameters passed by the user and assign them
  # to variables in the function.
  if ( is.null(parameters.data.frame[['clusters']]) ) {
    stop("NULL value for clusters! clusters cannot be NULL.")
  } else {
    clusters.value <- parameters.data.frame[['clusters']]
  }
  if ( is.null(parameters.data.frame[['nstart']]) ) {
    stop("NULL value for nstart! nstart cannot be NULL.")
  } else {
    nstart.value <- parameters.data.frame[['nstart']]
  }
  # Apply the algorithm to the data.
  kmeans.clusters <- kmeans(input.data.frame[, 1:length(input.data.frame)],
                           clusters.value, nstart = nstart.value)
  final.output <- data.frame(kmeans.clusters$cluster)
  return(final.output)
}

KMeans_UserFactory <- function() {
  list(name    = KMeans_User,
       udxtype = c("scalar"),
       # Since this is a polymorphic function the intype must be any
       intype  = c("any"),
       outtype = c("int"),
       parametertypecallback=KMeansParameters)
}

KMeansParameters <- function() {
  parameters <- list(datatype = c("int", "int"),
                     length   = c("NA", "NA"),
                     scale    = c("NA", "NA"),
                     name     = c("clusters", "nstart"))
  return(parameters)
}

11 - C++ 示例:使用复杂类型

UDx 可以接受和返回复杂类型。ArraySlice 示例将一个数组和两个索引作为输入,返回一个仅包含该范围内的值的数组。由于数组元素可以是任何类型,因此函数是多态的。

完整的源代码位于 /opt/vertica/sdk/examples/ScalarFunctions/ArraySlice.cpp 中。

加载和使用示例

加载库并创建函数,如下所示:

=> CREATE OR REPLACE LIBRARY ScalarFunctions AS '/home/dbadmin/examplesUDSF.so';

=> CREATE FUNCTION ArraySlice AS
LANGUAGE 'C++' NAME 'ArraySliceFactory' LIBRARY ScalarFunctions;

创建一些数据并在其上调用函数,如下所示:

=> CREATE TABLE arrays (id INTEGER, aa ARRAY[INTEGER]);
COPY arrays FROM STDIN;
1|[]
2|[1,2,3]
3|[5,4,3,2,1]
\.

=> CREATE TABLE slices (b INTEGER, e INTEGER);
COPY slices FROM STDIN;
0|2
1|3
2|4
\.

=> SELECT id, b, e, ArraySlice(aa, b, e) AS slice FROM arrays, slices;
 id | b | e | slice
----+---+---+-------
  1 | 0 | 2 | []
  1 | 1 | 3 | []
  1 | 2 | 4 | []
  2 | 0 | 2 | [1,2]
  2 | 1 | 3 | [2,3]
  2 | 2 | 4 | [3]
  3 | 0 | 2 | [5,4]
  3 | 1 | 3 | [4,3]
  3 | 2 | 4 | [3,2]
(9 rows)

工厂实施

由于函数是多态的,getPrototype() 声明输入和输出可以是任何类型,类型强制必须在别处完成:

void getPrototype(ServerInterface &srvInterface,
                              ColumnTypes &argTypes,
                              ColumnTypes &returnType) override
{
    /*
     * This is a polymorphic function that accepts any array
     * and returns an array of the same type
     */
    argTypes.addAny();
    returnType.addAny();
}

工厂验证输入类型并确定 getReturnType() 中的返回类型:

void getReturnType(ServerInterface &srvInterface,
                   const SizedColumnTypes &argTypes,
                   SizedColumnTypes &returnType) override
{
    /*
     * Three arguments: (array, slicebegin, sliceend)
     * Validate manually since the prototype accepts any arguments.
     */
    if (argTypes.size() != 3) {
        vt_report_error(0, "Three arguments (array, slicebegin, sliceend) expected");
    } else if (!argTypes[0].getType().isArrayType()) {
        vt_report_error(1, "Argument 1 is not an array");
    } else if (!argTypes[1].getType().isInt()) {
        vt_report_error(2, "Argument 2 (slicebegin) is not an integer)");
    } else if (!argTypes[2].getType().isInt()) {
        vt_report_error(3, "Argument 3 (sliceend) is not an integer)");
    }

    /* return type is the same as the array arg type, copy it over */
    returnType.push_back(argTypes[0]);
}

函数实施

使用 BlockReaderBlockWriter 调用 processBlock() 方法。第一个实参是一个数组。为了访问数组的元素,该方法使用 ArrayReader。同样,它使用 ArrayWriter 来构造输出。

void processBlock(ServerInterface &srvInterface,
        BlockReader &argReader,
        BlockWriter &resWriter) override
{
    do {
        if (argReader.isNull(0) || argReader.isNull(1) || argReader.isNull(2)) {
            resWriter.setNull();
        } else {
            Array::ArrayReader argArray  = argReader.getArrayRef(0);
            const vint slicebegin = argReader.getIntRef(1);
            const vint sliceend   = argReader.getIntRef(2);

            Array::ArrayWriter outArray = resWriter.getArrayRef(0);
            if (slicebegin < sliceend) {
                for (int i = 0; i < slicebegin && argArray->hasData(); i++) {
                    argArray->next();
                }
                for (int i = slicebegin; i < sliceend && argArray->hasData(); i++) {
                    outArray->copyFromInput(*argArray);
                    outArray->next();
                    argArray->next();
                }
            }
            outArray.commit();  /* finalize the written array elements */
        }
        resWriter.next();
    } while (argReader.next());
}

12 - C++ 示例:返回多个值

编写 UDSF 时,可以指定多个返回值。如果您指定多个值,Vertica 会将它们打包到单个 ROW 作为返回值。您可以查询 ROW 中的字段或整个 ROW。

下面的示例实施一个名为 div(除法)的函数,它返回两个整数,即商和余数。

此示例显示了一种从 UDSF 返回 ROW 的方法。当输入和输出都是基元类型时,返回多个值并让 Vertica 构建 ROW 非常方便。您还可以直接使用复杂类型,如复杂类型作为实参中所述和 C++ 示例:使用复杂类型中所示。

加载和使用示例

加载库并创建函数,如下所示:

=> CREATE OR REPLACE LIBRARY ScalarFunctions AS '/home/dbadmin/examplesUDSF.so';

=> CREATE FUNCTION div AS
LANGUAGE 'C++' NAME 'DivFactory' LIBRARY ScalarFunctions;

创建一些数据并在其上调用函数,如下所示:

=> CREATE TABLE D (a INTEGER, b INTEGER);
COPY D FROM STDIN DELIMITER ',';
10,0
10,1
10,2
10,3
10,4
10,5
\.

=> SELECT a, b, Div(a, b), (Div(a, b)).quotient, (Div(a, b)).remainder FROM D;
 a  | b |                  Div               | quotient | remainder
----+---+------------------------------------+----------+-----------
 10 | 0 | {"quotient":null,"remainder":null} |          |
 10 | 1 | {"quotient":10,"remainder":0}      |       10 |         0
 10 | 2 | {"quotient":5,"remainder":0}       |        5 |         0
 10 | 3 | {"quotient":3,"remainder":1}       |        3 |         1
 10 | 4 | {"quotient":2,"remainder":2}       |        2 |         2
 10 | 5 | {"quotient":2,"remainder":0}       |        2 |         0
(6 rows)

工厂实施

工厂在 getPrototype()getReturnType() 中声明了两个返回值。工厂在其他方面没有作用。


    void getPrototype(ServerInterface &interface,
            ColumnTypes &argTypes,
            ColumnTypes &returnType) override
    {
        argTypes.addInt();
        argTypes.addInt();
        returnType.addInt(); /* quotient  */
        returnType.addInt(); /* remainder */
    }

    void getReturnType(ServerInterface &srvInterface,
            const SizedColumnTypes &argTypes,
            SizedColumnTypes &returnType) override
    {
        returnType.addInt("quotient");
        returnType.addInt("remainder");
    }

函数实施

该函数在 processBlock() 中写入两个输出值。此处值的数量必须与工厂声明相匹配。


class Div : public ScalarFunction {
    void processBlock(Vertica::ServerInterface &srvInterface,
            Vertica::BlockReader &argReader,
            Vertica::BlockWriter &resWriter) override
    {
        do {
            if (argReader.isNull(0) || argReader.isNull(1) || (argReader.getIntRef(1) == 0)) {
                resWriter.setNull(0);
                resWriter.setNull(1);
            } else {
                const vint dividend = argReader.getIntRef(0);
                const vint divisor  = argReader.getIntRef(1);
                resWriter.setInt(0, dividend / divisor);
                resWriter.setInt(1, dividend % divisor);
            }
            resWriter.next();
        } while (argReader.next());
    }
};

13 - C++ 示例:从检查约束调用 UDSF

此示例显示了创建可由检查约束调用的 UDSF 所需的 C++ 代码。示例函数的名称是 LargestSquareBelow。此示例函数确定其平方小于主题列中的数字的最大数字。例如,如果该列中的数字是 1000,则其平方 (961) 小于 1000 的最大数字是 31。

有关检查约束的信息,请参阅检查约束

加载和使用示例

以下示例显示了如何使用 CREATE LIBRARY 创建和加载名为 MySqLib 的库。将此示例中的库路径调整为绝对路径,并将文件名调整为共享对象 LargestSquareBelow 保存到的位置。

创建库:

=> CREATE OR REPLACE LIBRARY MySqLib AS '/home/dbadmin/LargestSquareBelow.so';
  1. 创建并加载库之后,使用 CREATE FUNCTION(标量) 语句将函数添加到编录中:
=> CREATE OR REPLACE FUNCTION largestSqBelow AS LANGUAGE 'C++' NAME 'LargestSquareBelowInfo' LIBRARY MySqLib;
  1. 下一步,将 UDSF 包含到检查约束中。
=> CREATE TABLE squaretest(
   ceiling INTEGER UNIQUE,
   CONSTRAINT chk_sq CHECK (largestSqBelow(ceiling) < ceiling*ceiling)
);
  1. 将数据添加到表 squaretest 中:
=> COPY squaretest FROM stdin DELIMITER ','NULL'null';
-1
null
0
1
1000
1000000
1000001
\.

根据所使用的数据,输出应类似于以下示例:

SELECT ceiling, largestSqBelow(ceiling)
FROM squaretest ORDER BY ceiling;

ceiling  | largestSqBelow
---------+----------------
         |
      -1 |
       0 |
       1 |              0
    1000 |             31
 1000000 |            999
 1000001 |           1000
(7 rows)

ScalarFunction 实施

ScalarFunction 实施为 UDSF 执行处理工作,即确定其平方小于数字输入的最大数字。


#include "Vertica.h"
/*
 * ScalarFunction implementation for a UDSF that
 * determines the largest number whose square is less than
 * the number input.
*/
class LargestSquareBelow : public Vertica::ScalarFunction
{
public:
 /*
  * This function does all of the actual processing for the UDSF.
  * The inputs are retrieved via arg_reader
  * The outputs are returned via arg_writer
  *
 */
    virtual void processBlock(Vertica::ServerInterface &srvInterface,
                              Vertica::BlockReader &arg_reader,
                              Vertica::BlockWriter &res_writer)
    {
        if (arg_reader.getNumCols() != 1)
            vt_report_error(0, "Function only accept 1 argument, but %zu provided", arg_reader.getNumCols());
// While we have input to process
        do {
            // Read the input parameter by calling the
            // BlockReader.getIntRef class function
            const Vertica::vint a = arg_reader.getIntRef(0);
            Vertica::vint res;
            //Determine the largest square below the number
            if ((a != Vertica::vint_null) && (a > 0))
            {
                res = (Vertica::vint)sqrt(a - 1);
            }
            else
                res = Vertica::vint_null;
            //Call BlockWriter.setInt to store the output value,
            //which is the largest square
            res_writer.setInt(res);
            //Write the row and advance to the next output row
            res_writer.next();
            //Continue looping until there are no more input rows
        } while (arg_reader.next());
    }
};

ScalarFunctionFactory 实施

ScalarFunctionFactory 实施执行对输入和输出的处理工作,并将函数标记为不可变(如果计划在检查约束中使用 UDSF,则必须满足此要求)。


class LargestSquareBelowInfo : public Vertica::ScalarFunctionFactory
{
    //return an instance of LargestSquareBelow to perform the computation.
    virtual Vertica::ScalarFunction *createScalarFunction(Vertica::ServerInterface &srvInterface)
    //Call the vt_createFuncObj to create the new LargestSquareBelow class instance.
    { return Vertica::vt_createFuncObject<LargestSquareBelow>(srvInterface.allocator); }

    /*
     * This function returns the description of the input and outputs of the
     * LargestSquareBelow class's processBlock function.  It stores this information in
     * two ColumnTypes objects, one for the input parameter, and one for
     * the return value.
    */
    virtual void getPrototype(Vertica::ServerInterface &srvInterface,
                              Vertica::ColumnTypes &argTypes,
                              Vertica::ColumnTypes &returnType)
    {
        // Takes one int as input, so adds int to the argTypes object
        argTypes.addInt();
        // Returns a single int, so add a single int to the returnType object.
        // ScalarFunctions always return a single value.
        returnType.addInt();
    }
public:
    // the function cannot be called within a check constraint unless the UDx author
    // certifies that the function is immutable:
    LargestSquareBelowInfo() { vol = Vertica::IMMUTABLE; }
};

RegisterFactory 宏

使用 RegisterFactory 宏注册一个 ScalarFunctionFactory 子类。该宏将对工厂类进行实例化并将其包含的元数据供 Vertica 访问。要调用该宏,请将您的工厂类的名称传递给它。


RegisterFactory(LargestSquareBelowInfo);