Python example: count elements

The following example details a UDTF that takes a partition of arrays, computes the count of each distinct array element in the partition, and outputs each element and its count as a row value.

The following example details a UDTF that takes a partition of arrays, computes the count of each distinct array element in the partition, and outputs each element and its count as a row value. You can call the function on tables that contain multiple partitions of arrays.

The complete source code is in /opt/vertica/sdk/examples/python/TransformFunctions.py.

Loading and using the example

Load the library and create the transform function as follows:

=> CREATE OR REPLACE LIBRARY TransformFunctions AS '/home/dbadmin/examples/python/TransformFunctions.py' LANGUAGE 'Python';

=> CREATE TRANSFORM FUNCTION CountElements AS LANGUAGE 'Python' NAME 'countElementsUDTFactory' LIBRARY TransformFunctions;

You can create some data and then call the function on it, for example:


=> CREATE TABLE orders (storeID int, productIDs array[int]);
CREATE TABLE

=> INSERT INTO orders VALUES
    (1, array[101, 102, 103]),
    (1, array[102, 104]),
    (1, array[101, 102, 102, 201, 203]),
    (2, array[101, 202, 203, 202, 203]),
    (2, array[203]),
    (2, array[]);
OUTPUT
--------
6
(1 row)

=> COMMIT;
COMMIT

=> SELECT storeID, CountElements(productIDs) OVER (PARTITION BY storeID) FROM orders;
storeID |       element_count
--------+---------------------------
      1 | {"element":101,"count":2}
      1 | {"element":102,"count":4}
      1 | {"element":103,"count":1}
      1 | {"element":104,"count":1}
      1 | {"element":201,"count":1}
      1 | {"element":202,"count":1}
      2 | {"element":101,"count":1}
      2 | {"element":202,"count":2}
      2 | {"element":203,"count":3}
(9 rows)

Setup

All Python UDxs must import the Vertica SDK library:

import vertica_sdk

Factory implementation

The getPrototype() method declares that the inputs and outputs can be of any type, which means that type enforcement must be done elsewhere:


def getPrototype(self, srv_interface, arg_types, return_type):
    arg_types.addAny()
    return_type.addAny()

getReturnType() validates that the only argument to the function is an array and that the return type is a row with 'element' and 'count' fields:


def getReturnType(self, srv_interface, arg_types, return_type):

    if arg_types.getColumnCount() != 1:
        srv_interface.reportError(1, 'countElements UDT should take exactly one argument')

    if not arg_types.getColumnType(0).isArrayType():
        srv_interface.reportError(2, 'Argument to countElements UDT should be an ARRAY')

    retRowFields = vertica_sdk.SizedColumnTypes.makeEmpty()
    retRowFields.addColumn(arg_types.getColumnType(0).getElementType(), 'element')
    retRowFields.addInt('count')
    return_type.addRowType(retRowFields, 'element_count')

The countElementsUDTFactory class also contains a createTransformFunction() method that instantiates and returns the transform function.

Function implementation

The processBlock() method is called with a BlockReader and a BlockWriter, named arg_reader and res_writer respectively. The function loops through all the input arrays in a partition and uses a dictionary to collect the frequency of each element. To access elements of each input array, the method instantiates an ArrayReader. After collecting the element counts, the function writes each element and its count to a row. This process is repeated for each partition.


def processPartition(self, srv_interface, arg_reader, res_writer):

    elemCounts = dict()
    # Collect element counts for entire partition
    while (True):
        if not arg_reader.isNull(0):
            arr = arg_reader.getArray(0)
            for elem in arr:
                elemCounts[elem] = elemCounts.setdefault(elem, 0) + 1

        if not arg_reader.next():
            break

    # Write at least one value for each partition
    if len(elemCounts) == 0:
        elemCounts[None] = 0

    # Write out element counts as (element, count) pairs
    for pair in elemCounts.items():
        res_writer.setRow(0, pair)
        res_writer.next()