这是本节的多页打印视图。
点击此处打印.
返回本页常规视图.
标量函数 (UDSF)
用户定义的标量函数 (UDSF) 为读取的每个数据行返回单个值。您可以在任何可使用内置 Vertica 函数的地方使用 UDSF。您通常可以开发 UDSF 来执行在使用 SQL 语句和函数时因过于复杂或速度过慢而难以执行的数据处理。UDSF 还可让您在 Vertica 中使用由第三方库提供的分析函数,并同时保持高性能。
UDSF 返回单个列。您可以在 ROW 中自动返回多个值。ROW 是一组属性值对。在以下示例中,div_with_rem 是一个执行除法运算的 UDSF,将商和余数作为整数返回:
=> SELECT div_with_rem(18,5);
div_with_rem
------------------------------
{"quotient":3,"remainder":3}
(1 row)
从 UDSF 返回的 ROW 不能用作 COUNT 的实参。
或者,您可以自己构造一个复杂的返回值,如复杂类型作为实参中所述。
UDSF 必须为每个输入行返回值(除非生成错误;有关详细信息,请参阅处理错误)。未能为输入行返回值会导致不正确的结果,如果未在 隔离和非隔离模式 中运行,可能会破坏 Vertica 服务器的稳定性。
一个 UDSF 最多可以有 9800 个实参。
1 - ScalarFunction 类
ScalarFunction
类是 UDSF 的核心。子类必须定义用于执行标量操作的 processBlock()
方法。该类可定义多种方法以设置和分解该函数。
对于使用 C++ 语言编写的标量函数,您可以提供有助于优化查询的信息。请参阅提高查询性能(仅限 C++)。
执行操作
processBlock()
方法可执行您希望 UDSF 执行的所有处理。当用户在 SQL 语句中调用您的函数时,Vertica 会将来自函数参数的数据绑定在一起并将其传递至 processBlock()
。
processBlock()
方法的输入和输出由 BlockReader
和 BlockWriter
类的对象提供。这些类定义了用于为 UDSF 读取输入数据和写入输出数据的方法。
开发 UDSF 的大部分工作是编写 processBlock()
。对函数的所有处理在此阶段进行。UDSF 应遵循以下基本模式:
-
使用特定于数据类型的方法从 BlockReader
对象读入一组实参。
-
以某种方式处理数据。
-
使用 BlockWriter
类的特定于数据类型的方法之一输出结果值。
-
通过调用 BlockWriter.next()
和 BlockReader.next()
分别前进到下一个输出行和下一个输入行。
此过程将持续到没有更多数据行可供读取为止(BlockReader.next()
将返回 false)。
您必须确保 processBlock()
读取所有输入行并为每个行输出单个值。如未遵守此规则,将损坏 Vertica 为获取 UDSF 的输出而读取的数据结构。此规则的唯一例外出现在 processBlock()
函数将错误报告回 Vertica 时(请参阅处理错误)。在这种情况下,Vertica 不会尝试读取由 UDSF 生成的不完整结果集。
设置和分解
ScalarFunction
类定义了两种其他方法,您可以选择性地实施这两种方法以分配和释放资源: setup()
和 destroy()
。您应使用这些方法来分配和取消分配那些不通过 UDx API 分配的资源(有关详细信息,请参阅为 UDx 分配资源)。
注意
-
虽然为 ScalarFunction
子类选择的名称不必与稍后为其分配的 SQL 函数名称匹配,但 Vertica 会将名称设为相同视为最佳实践。
-
请勿假设系统会从将函数实例化的同一个线程调用该函数。
-
可以调用 ScalarFunction
子类的同一个实例以处理多个数据块。
-
不保证发送到 processBlock()
的输入行具有特定顺序。
-
写入太多输出行会导致 Vertica 发出超过边界错误。
API
ScalarFunction API 提供了以下通过子类扩展的方法:
virtual void setup(ServerInterface &srvInterface,
const SizedColumnTypes &argTypes);
virtual void processBlock(ServerInterface &srvInterface,
BlockReader &arg_reader, BlockWriter &res_writer)=0;
virtual void getOutputRange (ServerInterface &srvInterface,
ValueRangeReader &inRange, ValueRangeWriter &outRange)
virtual void cancel(ServerInterface &srvInterface);
virtual void destroy(ServerInterface &srvInterface, const SizedColumnTypes &argTypes);
ScalarFunction API 提供了以下通过子类扩展的方法:
public void setup(ServerInterface srvInterface, SizedColumnTypes argTypes);
public abstract void processBlock(ServerInterface srvInterface, BlockReader arg_reader,
BlockWriter res_writer) throws UdfException, DestroyInvocation;
protected void cancel(ServerInterface srvInterface);
public void destroy(ServerInterface srvInterface, SizedColumnTypes argTypes);
ScalarFunction API 提供了以下通过子类扩展的方法:
def setup(self, server_interface, col_types)
def processBlock(self, server_interface, block_reader, block_writer)
def destroy(self, server_interface, col_types)
实施 主函数 API 以定义标量函数:
FunctionName <- function(input.data.frame, parameters.data.frame) {
# Computations
# The function must return a data frame.
return(output.data.frame)
}
2 - ScalarFunctionFactory 类
ScalarFunctionFactory
类将向 Vertica 提供有关 UDSF 的元数据:其参数数量和数据类型及其返回值的数据类型。该类还会实例化 ScalarFunction
的子类。
方法
您必须在 ScalarFunctionFactory
子类中实施以下方法:
-
createScalarFunction()
实例化 ScalarFunction
子类。如果采用 C++ 进行编写,则您可以使用 ScalarFunction
子类的名称调用 vt_createFuncObj
宏。此宏会为您分配类并将该类实例化。
-
getPrototype()
会向 Vertica 提供有关 UDSF 的参数和返回类型。除了一个 ServerInterface
对象之外,此方法还获取另外两个 ColumnTypes
对象。在此函数中唯一需要执行的操作是,对这两个对象调用类函数以构建参数列表和返回值类型。如果返回多个值,则将结果打包为 ROW 类型。
定义工厂类之后,您需要调用 RegisterFactory
宏。此宏可将工厂类的成员实例化,以便 Vertica 可以与该成员交互并提取其中包含的有关 UDSF 的元数据。
声明返回值
如果函数返回特定大小的列(一种长度可变的返回数据类型,例如 VARCHAR)、需要精度的值或多个值,则必须实施 getReturnType()
。此方法由 Vertica 调用,以用于查找在每个结果行中返回的数据的长度和精度。此方法的返回值取决于 processBlock()
方法所返回的数据类型:
-
CHAR、(LONG) VARCHAR、BINARY 和 (LONG) VARBINARY 将返回最大长度。
-
NUMERIC 类型可指定精度和小数位数。
-
TIME 和 TIMESTAMP 值可指定精度(无论是否带有时区均可)。
-
INTERVAL YEAR TO MONTH 可指定范围。
-
INTERVAL DAY TO SECOND 可指定精度和范围。
-
ARRAY 类型将指定元素的最大数量。
如果 UDSF 不返回以上数据类型之一,而返回单个值,则它不需要实施 getReturnType()
方法。
传递到 getReturnType()
方法的输入是一个 SizedColumnTypes
对象,其中包含输入参数类型及其长度。此对象将传递到 processBlock()
函数的一个实例。getReturnType()
的实施必须从该输入提取数据类型和长度,并确定输出行的长度或精度,然后将此信息保存在 SizedColumnTypes
类的另一个实例中。
API
ScalarFunctionFactory API 提供了以下通过子类扩展的方法:
virtual ScalarFunction * createScalarFunction(ServerInterface &srvInterface)=0;
virtual void getPrototype(ServerInterface &srvInterface,
ColumnTypes &argTypes, ColumnTypes &returnType)=0;
virtual void getReturnType(ServerInterface &srvInterface,
const SizedColumnTypes &argTypes, SizedColumnTypes &returnType);
virtual void getParameterType(ServerInterface &srvInterface,
SizedColumnTypes ¶meterTypes);
ScalarFunctionFactory API 提供了以下通过子类扩展的方法:
public abstract ScalarFunction createScalarFunction(ServerInterface srvInterface);
public abstract void getPrototype(ServerInterface srvInterface, ColumnTypes argTypes, ColumnTypes returnType);
public void getReturnType(ServerInterface srvInterface, SizedColumnTypes argTypes,
SizedColumnTypes returnType) throws UdfException;
public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);
ScalarFunctionFactory API 提供了以下通过子类扩展的方法:
def createScalarFunction(self, srv)
def getPrototype(self, srv_interface, arg_types, return_type)
def getReturnType(self, srv_interface, arg_types, return_type)
实施 工厂函数 API 以定义标量函数工厂:
FunctionNameFactory <- function() {
list(name = FunctionName,
udxtype = c("scalar"),
intype = c("int"),
outtype = c("int"))
}
3 - 设置 null 输入和波动性行为
一般情况下,Vertica 会为查询中的每个数据行调用 UDSF。在某些情况下,Vertica 可以避免执行您的 UDSF。您可以告知 Vertica 何时能够跳过函数调用,而只需通过更改函数的波动性和严格性设置来提供返回值本身。
-
函数的波动性 是指在传递相同的参数时,它是否始终返回相同的输出值。根据其行为,Vertica 可以缓存实参和返回值。如果用户使用相同的实参集来调用 UDSF,Vertica 会返回缓存值,而不会调用 UDSF。
-
函数的严格性 是指它如何对 NULL 实参作出响应。如果“任何”实参为 NULL 时它始终返回 NULL,则 Vertica 可以只返回 NULL,而无需调用函数。此优化还可以节省工作量,因为您无需在 UDSF 代码中测试并处理 Null 参数。
可以通过在 ScalarFunctionFactory
类的构造函数中设置 vol
和 strict
字段来指示函数的可变性和空值处理。
可变性设置
要指示函数的可变性,请将 vol
字段设置为以下值之一:
示例
以下示例显示了使函数不可变的 Add2ints
示例工厂类的某个版本。
class Add2intsImmutableFactory : public Vertica::ScalarFunctionFactory
{
virtual Vertica::ScalarFunction *createScalarFunction(Vertica::ServerInterface &srvInterface)
{ return vt_createFuncObj(srvInterface.allocator, Add2ints); }
virtual void getPrototype(Vertica::ServerInterface &srvInterface,
Vertica::ColumnTypes &argTypes,
Vertica::ColumnTypes &returnType)
{
argTypes.addInt();
argTypes.addInt();
returnType.addInt();
}
public:
Add2intsImmutableFactory() {vol = IMMUTABLE;}
};
RegisterFactory(Add2intsImmutableFactory);
以下示例演示了将 Add2IntsFactory
的 vol
字段设置为 IMMUTABLE 以向 Vertica 说明可以缓存实参和返回值。
public class Add2IntsFactory extends ScalarFunctionFactory {
@Override
public void getPrototype(ServerInterface srvInterface, ColumnTypes argTypes, ColumnTypes returnType){
argTypes.addInt();
argTypes.addInt();
returnType.addInt();
}
@Override
public ScalarFunction createScalarFunction(ServerInterface srvInterface){
return new Add2Ints();
}
// Class constructor
public Add2IntsFactory() {
// Tell Vertica that the same set of arguments will always result in the
// same return value.
vol = volatility.IMMUTABLE;
}
}
Null 输入行为
要指示函数如何响应 Null 输入,请将 strictness
字段设置为以下值之一:
示例
以下 C++ 示例演示了设置 Add2ints 的 null 行为以使 Vertica 不调用带有 NULL 值的函数。
class Add2intsNullOnNullInputFactory : public Vertica::ScalarFunctionFactory
{
virtual Vertica::ScalarFunction *createScalarFunction(Vertica::ServerInterface &srvInterface)
{ return vt_createFuncObj(srvInterface.allocator, Add2ints); }
virtual void getPrototype(Vertica::ServerInterface &srvInterface,
Vertica::ColumnTypes &argTypes,
Vertica::ColumnTypes &returnType)
{
argTypes.addInt();
argTypes.addInt();
returnType.addInt();
}
public:
Add2intsNullOnNullInputFactory() {strict = RETURN_NULL_ON_NULL_INPUT;}
};
RegisterFactory(Add2intsNullOnNullInputFactory);
4 - 提高查询性能(仅限 C++)
在评估查询时,Vertica 可以利用有关值范围的可用信息。例如,如果数据已分区并且查询通过分区值限制输出,则 Vertica 可以忽略不可能包含满足查询的数据的分区。同样,对于标量函数,Vertica 可以不处理数据中函数返回的值不可能影响结果的行。
考虑一个表(表中包含数百万行客户订单数据)和一个标量函数(该函数计算为订单中的所有项目支付的总价格)。查询使用 WHERE 子句将结果限制为高于给定值的订单。对数据块调用标量函数;如果该块中没有行可以产生目标值,则跳过该块的处理可以提高查询性能。
用 C++ 编写的标量函数可以实施 getOutputRange
方法。在调用 processBlock
之前,Vertica 调用 getOutputRange
以在给定输入范围的情况下确定此块的最小和最大返回值。然后,它决定是否调用 processBlock
来执行计算。
Add2Ints 示例实施了此函数。最小输出值是两个输入的最小值之和,最大输出值是每个输入的最大值之和。此函数不考虑单个行。考虑以下输入:
a | b
------+------
21 | 92
500 | 19
111 | 11
两个输入的最小值是 21 和 11,因此函数将 32 报告为输出范围的下限。最大输入值为 500 和 92,因此它将 592 报告为输出范围的上限。任何输入行的返回值均大于 32,小于 592。
getOutputRange
的目的是快速消除输出肯定超出范围的调用。例如,如果查询包含 "WHERE Add2Ints(a,b) > 600",则可以跳过该数据块。仍然可能存在调用 getOutputRange
后 processBlock
没有返回结果的情况。如果查询包含 "WHERE Add2Ints(a,b) > 500",则 getOutputRange
不会消除此数据块。
Add2Ints 实施 getOutputRange
,如下所示:
/*
* This method computes the output range for this scalar function from
* the ranges of its inputs in a single invocation.
*
* The input ranges are retrieved via inRange
* The output range is returned via outRange
*/
virtual void getOutputRange(Vertica::ServerInterface &srvInterface,
Vertica::ValueRangeReader &inRange,
Vertica::ValueRangeWriter &outRange)
{
if (inRange.hasBounds(0) && inRange.hasBounds(1)) {
// Input ranges have bounds defined
if (inRange.isNull(0) || inRange.isNull(1)) {
// At least one range has only NULL values.
// Output range can only have NULL values.
outRange.setNull();
outRange.setHasBounds();
return;
} else {
// Compute output range
const vint& a1LoBound = inRange.getIntRefLo(0);
const vint& a2LoBound = inRange.getIntRefLo(1);
outRange.setIntLo(a1LoBound + a2LoBound);
const vint& a1UpBound = inRange.getIntRefUp(0);
const vint& a2UpBound = inRange.getIntRefUp(1);
outRange.setIntUp(a1UpBound + a2UpBound);
}
} else {
// Input ranges are unbounded. No output range can be defined
return;
}
if (!inRange.canHaveNulls(0) && !inRange.canHaveNulls(1)) {
// There cannot be NULL values in the output range
outRange.setCanHaveNulls(false);
}
// Let Vertica know that the output range is bounded
outRange.setHasBounds();
}
如果 getOutputRange
产生错误,Vertica 会发出警告并且不会为当前查询再次调用该方法。
5 - C++ 示例:Add2Ints
以下示例显示了基本的 ScalarFunction
子类,它名为 Add2ints
。顾名思义,它将两个整数相加,并返回单个整数结果。
有关完整的源代码,请参阅/opt/vertica/sdk/examples/ScalarFunctions/Add2Ints.cpp
。此 UDx 的 Java 和 Python 版本包含在 /opt/vertica/sdk/examples
中。
加载和使用示例
使用 CREATE LIBRARY 加载包含函数的库,然后按下例所示使用 CREATE FUNCTION(标量) 声明该函数:
=> CREATE LIBRARY ScalarFunctions AS '/home/dbadmin/examples/ScalarFunctions.so';
=> CREATE FUNCTION add2ints AS LANGUAGE 'C++' NAME 'Add2IntsFactory' LIBRARY ScalarFunctions;
以下示例显示了如何使用该函数:
=> SELECT Add2Ints(27,15);
Add2ints
----------
42
(1 row)
=> SELECT * FROM MyTable;
a | b
-----+----
7 | 0
12 | 2
12 | 6
18 | 9
1 | 1
58 | 4
450 | 15
(7 rows)
=> SELECT * FROM MyTable WHERE Add2ints(a, b) > 20;
a | b
-----+----
18 | 9
58 | 4
450 | 15
(3 rows)
函数实施
标量函数在 processBlock
方法中进行计算:
class Add2Ints : public ScalarFunction
{
public:
/*
* This method processes a block of rows in a single invocation.
*
* The inputs are retrieved via argReader
* The outputs are returned via resWriter
*/
virtual void processBlock(ServerInterface &srvInterface,
BlockReader &argReader,
BlockWriter &resWriter)
{
try {
// While we have inputs to process
do {
if (argReader.isNull(0) || argReader.isNull(1)) {
resWriter.setNull();
} else {
const vint a = argReader.getIntRef(0);
const vint b = argReader.getIntRef(1);
resWriter.setInt(a+b);
}
resWriter.next();
} while (argReader.next());
} catch(std::exception& e) {
// Standard exception. Quit.
vt_report_error(0, "Exception while processing block: [%s]", e.what());
}
}
// ...
};
实施 getOutputRange
,这是可选的,允许您的函数跳过结果不在目标范围内的行。例如,如果 WHERE 子句将查询结果限制在某个范围内,则无需为不可能在该范围内的情况调用该函数。
/*
* This method computes the output range for this scalar function from
* the ranges of its inputs in a single invocation.
*
* The input ranges are retrieved via inRange
* The output range is returned via outRange
*/
virtual void getOutputRange(Vertica::ServerInterface &srvInterface,
Vertica::ValueRangeReader &inRange,
Vertica::ValueRangeWriter &outRange)
{
if (inRange.hasBounds(0) && inRange.hasBounds(1)) {
// Input ranges have bounds defined
if (inRange.isNull(0) || inRange.isNull(1)) {
// At least one range has only NULL values.
// Output range can only have NULL values.
outRange.setNull();
outRange.setHasBounds();
return;
} else {
// Compute output range
const vint& a1LoBound = inRange.getIntRefLo(0);
const vint& a2LoBound = inRange.getIntRefLo(1);
outRange.setIntLo(a1LoBound + a2LoBound);
const vint& a1UpBound = inRange.getIntRefUp(0);
const vint& a2UpBound = inRange.getIntRefUp(1);
outRange.setIntUp(a1UpBound + a2UpBound);
}
} else {
// Input ranges are unbounded. No output range can be defined
return;
}
if (!inRange.canHaveNulls(0) && !inRange.canHaveNulls(1)) {
// There cannot be NULL values in the output range
outRange.setCanHaveNulls(false);
}
// Let Vertica know that the output range is bounded
outRange.setHasBounds();
}
工厂实施
工厂实例化了类的一个成员 (createScalarFunction
),并且还描述了函数的输入和输出 (getPrototype
):
class Add2IntsFactory : public ScalarFunctionFactory
{
// return an instance of Add2Ints to perform the actual addition.
virtual ScalarFunction *createScalarFunction(ServerInterface &interface)
{ return vt_createFuncObject<Add2Ints>(interface.allocator); }
// This function returns the description of the input and outputs of the
// Add2Ints class's processBlock function. It stores this information in
// two ColumnTypes objects, one for the input parameters, and one for
// the return value.
virtual void getPrototype(ServerInterface &interface,
ColumnTypes &argTypes,
ColumnTypes &returnType)
{
argTypes.addInt();
argTypes.addInt();
// Note that ScalarFunctions *always* return a single value.
returnType.addInt();
}
};
RegisterFactory 宏
使用 RegisterFactory
宏注册一个 UDx。该宏将对工厂类进行实例化并将其包含的元数据供 Vertica 访问。要调用该宏,请将您的工厂类的名称传递给它。
RegisterFactory(Add2IntsFactory);
6 - Python 示例:currency_convert
currency_convert
标量函数从表中读取两个值,即币种和价值。然后它将项目的价值转换为美元,返回一个浮点结果。
您可以在 Vertica Github 存储库中找到更多 UDx 示例:https://github.com/vertica/UDx-Examples。
UDSF Python 代码
import vertica_sdk
import decimal
rates2USD = {'USD': 1.000,
'EUR': 0.89977,
'GBP': 0.68452,
'INR': 67.0345,
'AUD': 1.39187,
'CAD': 1.30335,
'ZAR': 15.7181,
'XXX': -1.0000}
class currency_convert(vertica_sdk.ScalarFunction):
"""Converts a money column to another currency
Returns a value in USD.
"""
def __init__(self):
pass
def setup(self, server_interface, col_types):
pass
def processBlock(self, server_interface, block_reader, block_writer):
while(True):
currency = block_reader.getString(0)
try:
rate = decimal.Decimal(rates2USD[currency])
except KeyError:
server_interface.log("ERROR: {} not in dictionary.".format(currency))
# Scalar functions always need a value to move forward to the
# next input row. Therefore, we need to assign it a value to
# move beyond the error.
currency = 'XXX'
rate = decimal.Decimal(rates2USD[currency])
starting_value = block_reader.getNumeric(1)
converted_value = decimal.Decimal(starting_value / rate)
block_writer.setNumeric(converted_value)
block_writer.next()
if not block_reader.next():
break
def destroy(self, server_interface, col_types):
pass
class currency_convert_factory(vertica_sdk.ScalarFunctionFactory):
def createScalarFunction(self, srv):
return currency_convert()
def getPrototype(self, srv_interface, arg_types, return_type):
arg_types.addVarchar()
arg_types.addNumeric()
return_type.addNumeric()
def getReturnType(self, srv_interface, arg_types, return_type):
return_type.addNumeric(9,4)
加载函数和库
创建库和函数。
=> CREATE LIBRARY pylib AS '/home/dbadmin/python_udx/currency_convert/currency_convert.py' LANGUAGE 'Python';
CREATE LIBRARY
=> CREATE FUNCTION currency_convert AS LANGUAGE 'Python' NAME 'currency_convert_factory' LIBRARY pylib fenced;
CREATE FUNCTION
使用函数查询数据
以下查询显示了如何使用 UDSF 运行查询。
=> SELECT product, currency_convert(currency, value) AS cost_in_usd
FROM items;
product | cost_in_usd
--------------+-------------
Shoes | 133.4008
Soccer Ball | 110.2817
Coffee | 13.5190
Surfboard | 176.2593
Hockey Stick | 76.7177
Car | 17000.0000
Software | 10.4424
Hamburger | 7.5000
Fish | 130.4272
Cattle | 269.2367
(10 rows)
7 - Python 示例:validate_url
validate_url
标量函数会从表中读取字符串,即 URL。然后,它会验证该 URL 是否会做出响应,以返回状态代码或指示尝试失败的字符串。
您可以在 Vertica Github 存储库中找到更多 UDx 示例:https://github.com/vertica/UDx-Examples。
UDSF Python 代码
import vertica_sdk
import urllib.request
import time
class validate_url(vertica_sdk.ScalarFunction):
"""Validates HTTP requests.
Returns the status code of a webpage. Pages that cannot be accessed return
"Failed to load page."
"""
def __init__(self):
pass
def setup(self, server_interface, col_types):
pass
def processBlock(self, server_interface, arg_reader, res_writer):
# Writes a string to the UDx log file.
server_interface.log("Validating webpage accessibility - UDx")
while(True):
url = arg_reader.getString(0)
try:
status = urllib.request.urlopen(url).getcode()
# Avoid overwhelming web servers -- be nice.
time.sleep(2)
except (ValueError, urllib.error.HTTPError, urllib.error.URLError):
status = 'Failed to load page'
res_writer.setString(str(status))
res_writer.next()
if not arg_reader.next():
# Stop processing when there are no more input rows.
break
def destroy(self, server_interface, col_types):
pass
class validate_url_factory(vertica_sdk.ScalarFunctionFactory):
def createScalarFunction(self, srv):
return validate_url()
def getPrototype(self, srv_interface, arg_types, return_type):
arg_types.addVarchar()
return_type.addChar()
def getReturnType(self, srv_interface, arg_types, return_type):
return_type.addChar(20)
加载函数和库
创建库和函数。
=> CREATE OR REPLACE LIBRARY pylib AS 'webpage_tester/validate_url.py' LANGUAGE 'Python';
=> CREATE OR REPLACE FUNCTION validate_url AS LANGUAGE 'Python' NAME 'validate_url_factory' LIBRARY pylib fenced;
使用函数查询数据
以下查询显示了如何使用 UDSF 运行查询。
=> SELECT url, validate_url(url) AS url_status FROM webpages;
url | url_status
-----------------------------------------------+----------------------
http://www.vertica.com/documentation/vertica/ | 200
http://www.google.com/ | 200
http://www.mass.gov.com/ | Failed to load page
http://www.espn.com | 200
http://blah.blah.blah.blah | Failed to load page
http://www.microfocus.com/ | 200
(6 rows)
8 - Python 示例:矩阵乘法
Python UDx 可以接受并返回复杂类型。MatrixMultiply
类会乘以输入矩阵,并返回生成的矩阵乘积。这些矩阵将以二维数组来表示。为了执行矩阵乘法运算,第一个输入矩阵中的列数必须等于第二个输入矩阵中的行数。
完整的源代码位于 /opt/vertica/sdk/examples/python/ScalarFunctions.py
中。
加载和使用示例
加载库并创建函数,如下所示:
=> CREATE OR REPLACE LIBRARY ScalarFunctions AS '/home/dbadmin/examples/python/ScalarFunctions.py' LANGUAGE 'Python';
=> CREATE FUNCTION MatrixMultiply AS LANGUAGE 'Python' NAME 'matrix_multiply_factory' LIBRARY ScalarFunctions;
您可以创建输入矩阵,然后调用诸如以下函数:
=> CREATE TABLE mn (id INTEGER, data ARRAY[ARRAY[INTEGER, 3], 2]);
CREATE TABLE
=> CREATE TABLE np (id INTEGER, data ARRAY[ARRAY[INTEGER, 2], 3]);
CREATE TABLE
=> COPY mn FROM STDIN PARSER fjsonparser();
{"id": 1, "data": [[1, 2, 3], [4, 5, 6]] }
{"id": 2, "data": [[7, 8, 9], [10, 11, 12]] }
\.
=> COPY np FROM STDIN PARSER fjsonparser();
{"id": 1, "data": [[0, 0], [0, 0], [0, 0]] }
{"id": 2, "data": [[1, 1], [1, 1], [1, 1]] }
{"id": 3, "data": [[2, 0], [0, 2], [2, 0]] }
\.
=> SELECT mn.id, np.id, MatrixMultiply(mn.data, np.data) FROM mn CROSS JOIN np ORDER BY 1, 2;
id | id | MatrixMultiply
---+----+-------------------
1 | 1 | [[0,0],[0,0]]
1 | 2 | [[6,6],[15,15]]
1 | 3 | [[8,4],[20,10]]
2 | 1 | [[0,0],[0,0]]
2 | 2 | [[24,24],[33,33]]
2 | 3 | [[32,16],[44,22]]
(6 rows)
设置
所有 Python UDx 都必须导入 Vertica SDK 库:
工厂实施
getPrototype()
方法会声明函数实参和返回类型都必须为二维数组,以整数数组的数组来表示:
def getPrototype(self, srv_interface, arg_types, return_type):
array1dtype = vertica_sdk.ColumnTypes.makeArrayType(vertica_sdk.ColumnTypes.makeInt())
arg_types.addArrayType(array1dtype)
arg_types.addArrayType(array1dtype)
return_type.addArrayType(array1dtype)
getReturnType()
验证乘积矩阵的行数是否与第一个输入矩阵相同,以及列数是否与第二个输入矩阵相同:
def getReturnType(self, srv_interface, arg_types, return_type):
(_, a1type) = arg_types[0]
(_, a2type) = arg_types[1]
m = a1type.getArrayBound()
p = a2type.getElementType().getArrayBound()
return_type.addArrayType(vertica_sdk.SizedColumnTypes.makeArrayType(vertica_sdk.SizedColumnTypes.makeInt(), p), m)
函数实施
使用名称分别为 arg_reader
和 res_writer
的 BlockReader
和 BlockWriter
来调用 processBlock()
方法。为了访问输入数组的元素,该方法会使用 ArrayReader
实例。数组是嵌套的,因此必须为外部和内部数组实例化 ArrayReader
。列表推导式简化了将输入数组读取到列表中的过程。该方法会执行计算,然后使用 ArrayWriter
实例来构造乘积矩阵。
def processBlock(self, server_interface, arg_reader, res_writer):
while True:
lmat = [[cell.getInt(0) for cell in row.getArrayReader(0)] for row in arg_reader.getArrayReader(0)]
rmat = [[cell.getInt(0) for cell in row.getArrayReader(0)] for row in arg_reader.getArrayReader(1)]
omat = [[0 for c in range(len(rmat[0]))] for r in range(len(lmat))]
for i in range(len(lmat)):
for j in range(len(rmat[0])):
for k in range(len(rmat)):
omat[i][j] += lmat[i][k] * rmat[k][j]
res_writer.setArray(omat)
res_writer.next()
if not arg_reader.next():
break
9 - R 示例: SalesTaxCalculator
SalesTaxCalculator
标量函数会从表中读取浮点数和可变长字符串,即商品的价格和州名缩写。然后,它会使用州名缩写从列表中查找销售税率并计算商品的价格(包括所在州的销售税),以返回商品的总成本。
您可以在 Vertica Github 存储库中找到更多 UDx 示例:https://github.com/vertica/UDx-Examples。
加载函数和库
创建库和函数。
=> CREATE OR REPLACE LIBRARY rLib AS 'sales_tax_calculator.R' LANGUAGE 'R';
CREATE LIBRARY
=> CREATE OR REPLACE FUNCTION SalesTaxCalculator AS LANGUAGE 'R' NAME 'SalesTaxCalculatorFactory' LIBRARY rLib FENCED;
CREATE FUNCTION
使用函数查询数据
以下查询显示了如何使用 UDSF 运行查询。
=> SELECT item, state_abbreviation,
price, SalesTaxCalculator(price, state_abbreviation) AS Price_With_Sales_Tax
FROM inventory;
item | state_abbreviation | price | Price_With_Sales_Tax
-------------+--------------------+-------+---------------------
Scarf | AZ | 6.88 | 7.53016
Software | MA | 88.31 | 96.655295
Soccer Ball | MS | 12.55 | 13.735975
Beads | LA | 0.99 | 1.083555
Baseball | TN | 42.42 | 46.42869
Cheese | WI | 20.77 | 22.732765
Coffee Mug | MA | 8.99 | 9.839555
Shoes | TN | 23.99 | 26.257055
(8 rows)
UDSF R 代码
SalesTaxCalculator <- function(input.data.frame) {
# Not a complete list of states in the USA, but enough to get the idea.
state.sales.tax <- list(ma = 0.0625,
az = 0.087,
la = 0.0891,
tn = 0.0945,
wi = 0.0543,
ms = 0.0707)
for ( state_abbreviation in input.data.frame[, 2] ) {
# Ensure state abbreviations are lowercase.
lower_state <- tolower(state_abbreviation)
# Check if the state is in our state.sales.tax list.
if (is.null(state.sales.tax[[lower_state]])) {
stop("State is not in our small sample!")
} else {
sales.tax.rate <- state.sales.tax[[lower_state]]
item.price <- input.data.frame[, 1]
# Calculate the price including sales tax.
price.with.sales.tax <- (item.price) + (item.price * sales.tax.rate)
}
}
return(price.with.sales.tax)
}
SalesTaxCalculatorFactory <- function() {
list(name = SalesTaxCalculator,
udxtype = c("scalar"),
intype = c("float", "varchar"),
outtype = c("float"))
}
10 - R 示例:kmeans
KMeans_User
标量函数会从表中读取任意数量的列,即观察值。然后,在将 kmeans 群集算法应用于数据时,它会使用观测值和两个参数,以返回与行的群集相关联的整数值。
您可以在 Vertica Github 存储库中找到更多 UDx 示例:https://github.com/vertica/UDx-Examples。
加载函数和库
创建库和函数:
=> CREATE OR REPLACE LIBRARY rLib AS 'kmeans.R' LANGUAGE 'R';
CREATE LIBRARY
=> CREATE OR REPLACE FUNCTION KMeans_User AS LANGUAGE 'R' NAME 'KMeans_UserFactory' LIBRARY rLib FENCED;
CREATE FUNCTION
使用函数查询数据
以下查询显示了如何使用 UDSF 运行查询。
=> SELECT spec,
KMeans_User(sl, sw, pl, pw USING PARAMETERS clusters = 3, nstart = 20)
FROM iris;
spec | KMeans_User
-----------------+-------------
Iris-setosa | 2
Iris-setosa | 2
Iris-setosa | 2
Iris-setosa | 2
Iris-setosa | 2
Iris-setosa | 2
Iris-setosa | 2
Iris-setosa | 2
Iris-setosa | 2
Iris-setosa | 2
Iris-setosa | 2
.
.
.
(150 rows)
UDSF R 代码
KMeans_User <- function(input.data.frame, parameters.data.frame) {
# Take the clusters and nstart parameters passed by the user and assign them
# to variables in the function.
if ( is.null(parameters.data.frame[['clusters']]) ) {
stop("NULL value for clusters! clusters cannot be NULL.")
} else {
clusters.value <- parameters.data.frame[['clusters']]
}
if ( is.null(parameters.data.frame[['nstart']]) ) {
stop("NULL value for nstart! nstart cannot be NULL.")
} else {
nstart.value <- parameters.data.frame[['nstart']]
}
# Apply the algorithm to the data.
kmeans.clusters <- kmeans(input.data.frame[, 1:length(input.data.frame)],
clusters.value, nstart = nstart.value)
final.output <- data.frame(kmeans.clusters$cluster)
return(final.output)
}
KMeans_UserFactory <- function() {
list(name = KMeans_User,
udxtype = c("scalar"),
# Since this is a polymorphic function the intype must be any
intype = c("any"),
outtype = c("int"),
parametertypecallback=KMeansParameters)
}
KMeansParameters <- function() {
parameters <- list(datatype = c("int", "int"),
length = c("NA", "NA"),
scale = c("NA", "NA"),
name = c("clusters", "nstart"))
return(parameters)
}
11 - C++ 示例:使用复杂类型
UDx 可以接受和返回复杂类型。ArraySlice
示例将一个数组和两个索引作为输入,返回一个仅包含该范围内的值的数组。由于数组元素可以是任何类型,因此函数是多态的。
完整的源代码位于 /opt/vertica/sdk/examples/ScalarFunctions/ArraySlice.cpp
中。
加载和使用示例
加载库并创建函数,如下所示:
=> CREATE OR REPLACE LIBRARY ScalarFunctions AS '/home/dbadmin/examplesUDSF.so';
=> CREATE FUNCTION ArraySlice AS
LANGUAGE 'C++' NAME 'ArraySliceFactory' LIBRARY ScalarFunctions;
创建一些数据并在其上调用函数,如下所示:
=> CREATE TABLE arrays (id INTEGER, aa ARRAY[INTEGER]);
COPY arrays FROM STDIN;
1|[]
2|[1,2,3]
3|[5,4,3,2,1]
\.
=> CREATE TABLE slices (b INTEGER, e INTEGER);
COPY slices FROM STDIN;
0|2
1|3
2|4
\.
=> SELECT id, b, e, ArraySlice(aa, b, e) AS slice FROM arrays, slices;
id | b | e | slice
----+---+---+-------
1 | 0 | 2 | []
1 | 1 | 3 | []
1 | 2 | 4 | []
2 | 0 | 2 | [1,2]
2 | 1 | 3 | [2,3]
2 | 2 | 4 | [3]
3 | 0 | 2 | [5,4]
3 | 1 | 3 | [4,3]
3 | 2 | 4 | [3,2]
(9 rows)
工厂实施
由于函数是多态的,getPrototype()
声明输入和输出可以是任何类型,类型强制必须在别处完成:
void getPrototype(ServerInterface &srvInterface,
ColumnTypes &argTypes,
ColumnTypes &returnType) override
{
/*
* This is a polymorphic function that accepts any array
* and returns an array of the same type
*/
argTypes.addAny();
returnType.addAny();
}
工厂验证输入类型并确定 getReturnType()
中的返回类型:
void getReturnType(ServerInterface &srvInterface,
const SizedColumnTypes &argTypes,
SizedColumnTypes &returnType) override
{
/*
* Three arguments: (array, slicebegin, sliceend)
* Validate manually since the prototype accepts any arguments.
*/
if (argTypes.size() != 3) {
vt_report_error(0, "Three arguments (array, slicebegin, sliceend) expected");
} else if (!argTypes[0].getType().isArrayType()) {
vt_report_error(1, "Argument 1 is not an array");
} else if (!argTypes[1].getType().isInt()) {
vt_report_error(2, "Argument 2 (slicebegin) is not an integer)");
} else if (!argTypes[2].getType().isInt()) {
vt_report_error(3, "Argument 3 (sliceend) is not an integer)");
}
/* return type is the same as the array arg type, copy it over */
returnType.push_back(argTypes[0]);
}
函数实施
使用 BlockReader
和 BlockWriter
调用 processBlock()
方法。第一个实参是一个数组。为了访问数组的元素,该方法使用 ArrayReader
。同样,它使用 ArrayWriter
来构造输出。
void processBlock(ServerInterface &srvInterface,
BlockReader &argReader,
BlockWriter &resWriter) override
{
do {
if (argReader.isNull(0) || argReader.isNull(1) || argReader.isNull(2)) {
resWriter.setNull();
} else {
Array::ArrayReader argArray = argReader.getArrayRef(0);
const vint slicebegin = argReader.getIntRef(1);
const vint sliceend = argReader.getIntRef(2);
Array::ArrayWriter outArray = resWriter.getArrayRef(0);
if (slicebegin < sliceend) {
for (int i = 0; i < slicebegin && argArray->hasData(); i++) {
argArray->next();
}
for (int i = slicebegin; i < sliceend && argArray->hasData(); i++) {
outArray->copyFromInput(*argArray);
outArray->next();
argArray->next();
}
}
outArray.commit(); /* finalize the written array elements */
}
resWriter.next();
} while (argReader.next());
}
12 - C++ 示例:返回多个值
编写 UDSF 时,可以指定多个返回值。如果您指定多个值,Vertica 会将它们打包到单个 ROW 作为返回值。您可以查询 ROW 中的字段或整个 ROW。
下面的示例实施一个名为 div(除法)的函数,它返回两个整数,即商和余数。
此示例显示了一种从 UDSF 返回 ROW 的方法。当输入和输出都是基元类型时,返回多个值并让 Vertica 构建 ROW 非常方便。您还可以直接使用复杂类型,如复杂类型作为实参中所述和 C++ 示例:使用复杂类型中所示。
加载和使用示例
加载库并创建函数,如下所示:
=> CREATE OR REPLACE LIBRARY ScalarFunctions AS '/home/dbadmin/examplesUDSF.so';
=> CREATE FUNCTION div AS
LANGUAGE 'C++' NAME 'DivFactory' LIBRARY ScalarFunctions;
创建一些数据并在其上调用函数,如下所示:
=> CREATE TABLE D (a INTEGER, b INTEGER);
COPY D FROM STDIN DELIMITER ',';
10,0
10,1
10,2
10,3
10,4
10,5
\.
=> SELECT a, b, Div(a, b), (Div(a, b)).quotient, (Div(a, b)).remainder FROM D;
a | b | Div | quotient | remainder
----+---+------------------------------------+----------+-----------
10 | 0 | {"quotient":null,"remainder":null} | |
10 | 1 | {"quotient":10,"remainder":0} | 10 | 0
10 | 2 | {"quotient":5,"remainder":0} | 5 | 0
10 | 3 | {"quotient":3,"remainder":1} | 3 | 1
10 | 4 | {"quotient":2,"remainder":2} | 2 | 2
10 | 5 | {"quotient":2,"remainder":0} | 2 | 0
(6 rows)
工厂实施
工厂在 getPrototype()
和 getReturnType()
中声明了两个返回值。工厂在其他方面没有作用。
void getPrototype(ServerInterface &interface,
ColumnTypes &argTypes,
ColumnTypes &returnType) override
{
argTypes.addInt();
argTypes.addInt();
returnType.addInt(); /* quotient */
returnType.addInt(); /* remainder */
}
void getReturnType(ServerInterface &srvInterface,
const SizedColumnTypes &argTypes,
SizedColumnTypes &returnType) override
{
returnType.addInt("quotient");
returnType.addInt("remainder");
}
函数实施
该函数在 processBlock()
中写入两个输出值。此处值的数量必须与工厂声明相匹配。
class Div : public ScalarFunction {
void processBlock(Vertica::ServerInterface &srvInterface,
Vertica::BlockReader &argReader,
Vertica::BlockWriter &resWriter) override
{
do {
if (argReader.isNull(0) || argReader.isNull(1) || (argReader.getIntRef(1) == 0)) {
resWriter.setNull(0);
resWriter.setNull(1);
} else {
const vint dividend = argReader.getIntRef(0);
const vint divisor = argReader.getIntRef(1);
resWriter.setInt(0, dividend / divisor);
resWriter.setInt(1, dividend % divisor);
}
resWriter.next();
} while (argReader.next());
}
};
13 - C++ 示例:从检查约束调用 UDSF
此示例显示了创建可由检查约束调用的 UDSF 所需的 C++ 代码。示例函数的名称是 LargestSquareBelow
。此示例函数确定其平方小于主题列中的数字的最大数字。例如,如果该列中的数字是 1000,则其平方 (961) 小于 1000 的最大数字是 31。
重要
在检查约束中使用的 UDSF 必须是不可变的,并且该约束必须能够正确处理 null 值。否则,检查约束可能无法按预期工作。此外,Vertica 还会评估在已加载或更新的每个行上启用的检查约束的谓词,因此请考虑编写函数时的性能。
有关检查约束的信息,请参阅检查约束。
加载和使用示例
以下示例显示了如何使用 CREATE LIBRARY 创建和加载名为 MySqLib 的库。将此示例中的库路径调整为绝对路径,并将文件名调整为共享对象 LargestSquareBelow
保存到的位置。
创建库:
=> CREATE OR REPLACE LIBRARY MySqLib AS '/home/dbadmin/LargestSquareBelow.so';
- 创建并加载库之后,使用 CREATE FUNCTION(标量) 语句将函数添加到编录中:
=> CREATE OR REPLACE FUNCTION largestSqBelow AS LANGUAGE 'C++' NAME 'LargestSquareBelowInfo' LIBRARY MySqLib;
- 下一步,将 UDSF 包含到检查约束中。
=> CREATE TABLE squaretest(
ceiling INTEGER UNIQUE,
CONSTRAINT chk_sq CHECK (largestSqBelow(ceiling) < ceiling*ceiling)
);
- 将数据添加到表
squaretest
中:
=> COPY squaretest FROM stdin DELIMITER ','NULL'null';
-1
null
0
1
1000
1000000
1000001
\.
根据所使用的数据,输出应类似于以下示例:
SELECT ceiling, largestSqBelow(ceiling)
FROM squaretest ORDER BY ceiling;
ceiling | largestSqBelow
---------+----------------
|
-1 |
0 |
1 | 0
1000 | 31
1000000 | 999
1000001 | 1000
(7 rows)
ScalarFunction 实施
此 ScalarFunction
实施为 UDSF 执行处理工作,即确定其平方小于数字输入的最大数字。
#include "Vertica.h"
/*
* ScalarFunction implementation for a UDSF that
* determines the largest number whose square is less than
* the number input.
*/
class LargestSquareBelow : public Vertica::ScalarFunction
{
public:
/*
* This function does all of the actual processing for the UDSF.
* The inputs are retrieved via arg_reader
* The outputs are returned via arg_writer
*
*/
virtual void processBlock(Vertica::ServerInterface &srvInterface,
Vertica::BlockReader &arg_reader,
Vertica::BlockWriter &res_writer)
{
if (arg_reader.getNumCols() != 1)
vt_report_error(0, "Function only accept 1 argument, but %zu provided", arg_reader.getNumCols());
// While we have input to process
do {
// Read the input parameter by calling the
// BlockReader.getIntRef class function
const Vertica::vint a = arg_reader.getIntRef(0);
Vertica::vint res;
//Determine the largest square below the number
if ((a != Vertica::vint_null) && (a > 0))
{
res = (Vertica::vint)sqrt(a - 1);
}
else
res = Vertica::vint_null;
//Call BlockWriter.setInt to store the output value,
//which is the largest square
res_writer.setInt(res);
//Write the row and advance to the next output row
res_writer.next();
//Continue looping until there are no more input rows
} while (arg_reader.next());
}
};
ScalarFunctionFactory 实施
此 ScalarFunctionFactory
实施执行对输入和输出的处理工作,并将函数标记为不可变(如果计划在检查约束中使用 UDSF,则必须满足此要求)。
class LargestSquareBelowInfo : public Vertica::ScalarFunctionFactory
{
//return an instance of LargestSquareBelow to perform the computation.
virtual Vertica::ScalarFunction *createScalarFunction(Vertica::ServerInterface &srvInterface)
//Call the vt_createFuncObj to create the new LargestSquareBelow class instance.
{ return Vertica::vt_createFuncObject<LargestSquareBelow>(srvInterface.allocator); }
/*
* This function returns the description of the input and outputs of the
* LargestSquareBelow class's processBlock function. It stores this information in
* two ColumnTypes objects, one for the input parameter, and one for
* the return value.
*/
virtual void getPrototype(Vertica::ServerInterface &srvInterface,
Vertica::ColumnTypes &argTypes,
Vertica::ColumnTypes &returnType)
{
// Takes one int as input, so adds int to the argTypes object
argTypes.addInt();
// Returns a single int, so add a single int to the returnType object.
// ScalarFunctions always return a single value.
returnType.addInt();
}
public:
// the function cannot be called within a check constraint unless the UDx author
// certifies that the function is immutable:
LargestSquareBelowInfo() { vol = Vertica::IMMUTABLE; }
};
RegisterFactory 宏
使用 RegisterFactory
宏注册一个 ScalarFunctionFactory
子类。该宏将对工厂类进行实例化并将其包含的元数据供 Vertica 访问。要调用该宏,请将您的工厂类的名称传递给它。
RegisterFactory(LargestSquareBelowInfo);