Python 示例:计数元素
以下示例会详细说明采用数组分区的 UDTF,计算分区中每个不同数组元素的计数,并将每个元素及其计数输出为行值。您可以对包含多个数组分区的表调用相应函数。
完整的源代码位于 /opt/vertica/sdk/examples/python/TransformFunctions.py
中。
加载和使用示例
加载库并创建转换函数,如下所示:
=> CREATE OR REPLACE LIBRARY TransformFunctions AS '/home/dbadmin/examples/python/TransformFunctions.py' LANGUAGE 'Python';
=> CREATE TRANSFORM FUNCTION CountElements AS LANGUAGE 'Python' NAME 'countElementsUDTFactory' LIBRARY TransformFunctions;
您可以创建一些数据,然后对其调用相应函数,例如:
=> CREATE TABLE orders (storeID int, productIDs array[int]);
CREATE TABLE
=> INSERT INTO orders VALUES
(1, array[101, 102, 103]),
(1, array[102, 104]),
(1, array[101, 102, 102, 201, 203]),
(2, array[101, 202, 203, 202, 203]),
(2, array[203]),
(2, array[]);
OUTPUT
--------
6
(1 row)
=> COMMIT;
COMMIT
=> SELECT storeID, CountElements(productIDs) OVER (PARTITION BY storeID) FROM orders;
storeID | element_count
--------+---------------------------
1 | {"element":101,"count":2}
1 | {"element":102,"count":4}
1 | {"element":103,"count":1}
1 | {"element":104,"count":1}
1 | {"element":201,"count":1}
1 | {"element":202,"count":1}
2 | {"element":101,"count":1}
2 | {"element":202,"count":2}
2 | {"element":203,"count":3}
(9 rows)
设置
所有 Python UDx 都必须导入 Vertica SDK 库:
import vertica_sdk
工厂实施
getPrototype()
方法会声明输入和输出可以是任何类型,这意味着必须在其他位置执行类型强制:
def getPrototype(self, srv_interface, arg_types, return_type):
arg_types.addAny()
return_type.addAny()
getReturnType()
将验证函数的唯一实参是否为数组,以及返回类型是否是具有“element”和“count”字段的行:
def getReturnType(self, srv_interface, arg_types, return_type):
if arg_types.getColumnCount() != 1:
srv_interface.reportError(1, 'countElements UDT should take exactly one argument')
if not arg_types.getColumnType(0).isArrayType():
srv_interface.reportError(2, 'Argument to countElements UDT should be an ARRAY')
retRowFields = vertica_sdk.SizedColumnTypes.makeEmpty()
retRowFields.addColumn(arg_types.getColumnType(0).getElementType(), 'element')
retRowFields.addInt('count')
return_type.addRowType(retRowFields, 'element_count')
countElementsUDTFactory
类还包含可实例化并返回转换函数的 createTransformFunction()
方法。
函数实施
使用名称分别为 arg_reader
和 res_writer
的 BlockReader
和 BlockWriter
来调用 processBlock()
方法。该函数会遍历分区中的所有输入数组,并使用字典来收集每个元素的频率。为了访问每个输入数组的元素,该方法会实例化 ArrayReader
。收集元素计数后,该函数会将每个元素及其计数写入某个行中。对每个分区重复此过程。
def processPartition(self, srv_interface, arg_reader, res_writer):
elemCounts = dict()
# Collect element counts for entire partition
while (True):
if not arg_reader.isNull(0):
arr = arg_reader.getArray(0)
for elem in arr:
elemCounts[elem] = elemCounts.setdefault(elem, 0) + 1
if not arg_reader.next():
break
# Write at least one value for each partition
if len(elemCounts) == 0:
elemCounts[None] = 0
# Write out element counts as (element, count) pairs
for pair in elemCounts.items():
res_writer.setRow(0, pair)
res_writer.next()