Python 示例:计数元素

以下示例会详细说明采用数组分区的 UDTF,计算分区中每个不同数组元素的计数,并将每个元素及其计数输出为行值。您可以对包含多个数组分区的表调用相应函数。

完整的源代码位于 /opt/vertica/sdk/examples/python/TransformFunctions.py 中。

加载和使用示例

加载库并创建转换函数,如下所示:

=> CREATE OR REPLACE LIBRARY TransformFunctions AS '/home/dbadmin/examples/python/TransformFunctions.py' LANGUAGE 'Python';

=> CREATE TRANSFORM FUNCTION CountElements AS LANGUAGE 'Python' NAME 'countElementsUDTFactory' LIBRARY TransformFunctions;

您可以创建一些数据,然后对其调用相应函数,例如:


=> CREATE TABLE orders (storeID int, productIDs array[int]);
CREATE TABLE

=> INSERT INTO orders VALUES
    (1, array[101, 102, 103]),
    (1, array[102, 104]),
    (1, array[101, 102, 102, 201, 203]),
    (2, array[101, 202, 203, 202, 203]),
    (2, array[203]),
    (2, array[]);
OUTPUT
--------
6
(1 row)

=> COMMIT;
COMMIT

=> SELECT storeID, CountElements(productIDs) OVER (PARTITION BY storeID) FROM orders;
storeID |       element_count
--------+---------------------------
      1 | {"element":101,"count":2}
      1 | {"element":102,"count":4}
      1 | {"element":103,"count":1}
      1 | {"element":104,"count":1}
      1 | {"element":201,"count":1}
      1 | {"element":202,"count":1}
      2 | {"element":101,"count":1}
      2 | {"element":202,"count":2}
      2 | {"element":203,"count":3}
(9 rows)

设置

所有 Python UDx 都必须导入 Vertica SDK 库:

import vertica_sdk

工厂实施

getPrototype() 方法会声明输入和输出可以是任何类型,这意味着必须在其他位置执行类型强制:


def getPrototype(self, srv_interface, arg_types, return_type):
    arg_types.addAny()
    return_type.addAny()

getReturnType() 将验证函数的唯一实参是否为数组,以及返回类型是否是具有“element”和“count”字段的行:


def getReturnType(self, srv_interface, arg_types, return_type):

    if arg_types.getColumnCount() != 1:
        srv_interface.reportError(1, 'countElements UDT should take exactly one argument')

    if not arg_types.getColumnType(0).isArrayType():
        srv_interface.reportError(2, 'Argument to countElements UDT should be an ARRAY')

    retRowFields = vertica_sdk.SizedColumnTypes.makeEmpty()
    retRowFields.addColumn(arg_types.getColumnType(0).getElementType(), 'element')
    retRowFields.addInt('count')
    return_type.addRowType(retRowFields, 'element_count')

countElementsUDTFactory 类还包含可实例化并返回转换函数的 createTransformFunction() 方法。

函数实施

使用名称分别为 arg_readerres_writerBlockReaderBlockWriter 来调用 processBlock() 方法。该函数会遍历分区中的所有输入数组,并使用字典来收集每个元素的频率。为了访问每个输入数组的元素,该方法会实例化 ArrayReader。收集元素计数后,该函数会将每个元素及其计数写入某个行中。对每个分区重复此过程。


def processPartition(self, srv_interface, arg_reader, res_writer):

    elemCounts = dict()
    # Collect element counts for entire partition
    while (True):
        if not arg_reader.isNull(0):
            arr = arg_reader.getArray(0)
            for elem in arr:
                elemCounts[elem] = elemCounts.setdefault(elem, 0) + 1

        if not arg_reader.next():
            break

    # Write at least one value for each partition
    if len(elemCounts) == 0:
        elemCounts[None] = 0

    # Write out element counts as (element, count) pairs
    for pair in elemCounts.items():
        res_writer.setRow(0, pair)
        res_writer.next()