Python 示例:复杂类型的 JSON 解析器

以下示例详细说明了 UDParser,它接受 JSON 对象并将其解析为复杂类型。对于此示例,解析器假设输入数据是具有两个整数字段的行数组。输入记录应使用换行符进行分隔。如果 JSON 输入未指定任何行字段,则函数会将这些字段解析为 NULL。

此 UDParser 的源代码还包含一个工厂方法,用于解析具有整数和整数字段数组的行。解析器的实施与工厂中的返回类型无关,因此您可以创建具有不同返回类型且均指向 prepare() 方法中的 ComplexJsonParser() 类的工厂。完整的源代码位于 /opt/vertica/sdk/examples/python/UDParsers.py 中。

加载和使用示例

加载库并创建解析器,如下所示:


=> CREATE OR REPLACE LIBRARY UDParsers AS '/home/dbadmin/examples/python/UDParsers.py' LANGUAGE 'Python';

=> CREATE PARSER ComplexJsonParser AS LANGUAGE 'Python' NAME 'ArrayJsonParserFactory' LIBRARY UDParsers;

您现在可以定义一个表,然后使用 JSON 解析器将数据加载到其中,例如:


=> CREATE TABLE orders (a bool, arr array[row(a int, b int)]);
CREATE TABLE

=> COPY orders (arr) FROM STDIN WITH PARSER ComplexJsonParser();
[]
[{"a":1, "b":10}]
[{"a":1, "b":10}, {"a":null, "b":10}]
[{"a":1, "b":10},{"a":10, "b":20}]
[{"a":1, "b":10}, {"a":null, "b":null}]
[{"a":1, "b":2}, {"a":3, "b":4}, {"a":5, "b":6}, {"a":7, "b":8}, {"a":9, "b":10}, {"a":11, "b":12}, {"a":13, "b":14}]
\.

=> SELECT * FROM orders;
a |                                  arr
--+--------------------------------------------------------------------------
  | []
  | [{"a":1,"b":10}]
  | [{"a":1,"b":10},{"a":null,"b":10}]
  | [{"a":1,"b":10},{"a":10,"b":20}]
  | [{"a":1,"b":10},{"a":null,"b":null}]
  | [{"a":1,"b":2},{"a":3,"b":4},{"a":5,"b":6},{"a":7,"b":8},{"a":9,"b":10},{"a":11,"b":12},{"a":13,"b":14}]
(6 rows)

设置

所有 Python UDx 都必须导入 Vertica SDK 库。 ComplexJsonParser() 也需要 json 库。

import vertica_sdk
import json

工厂实施

prepare() 方法将实例化并返回解析器:


def prepare(self, srvInterface, perColumnParamReader, planCtxt, returnType):
    return ComplexJsonParser()

getParserReturnType() 声明返回类型必须是行数组,其中每个行都有两个整数字段:


def getParserReturnType(self, rvInterface, perColumnParamReader, planCtxt, argTypes, returnType):
    fieldTypes = vertica_sdk.SizedColumnTypes.makeEmpty()
    fieldTypes.addInt('a')
    fieldTypes.addInt('b')
    returnType.addArrayType(vertica_sdk.SizedColumnTypes.makeRowType(fieldTypes, 'elements'), 64, 'arr')

解析器实施

process() 方法会使用 InputBuffer 读入数据,然后在换行符处拆分该输入数据。随后,该方法会将处理后的数据传递给 writeRows() 方法。 writeRows() 会将每个数据行转换为 JSON 对象,检查该 JSON 对象的类型,然后将相应的值或对象写入输出。


class ComplexJsonParser(vertica_sdk.UDParser):

    leftover = ''

    def process(self, srvInterface, input_buffer, input_state, writer):
        input_buffer.setEncoding('utf-8')

        self.count = 0
        rec = self.leftover + input_buffer.read()
        row_lst = rec.split('\n')
        self.leftover = row_lst[-1]
        self.writeRows(row_lst[:-1], writer)
        if input_state == InputState.END_OF_FILE:
            self.writeRows([self.leftover], writer)
            return StreamState.DONE
        else:
            return StreamState.INPUT_NEEDED

    def writeRows(self, str_lst, writer):
        for s in str_lst:
            stripped = s.strip()
            if len(stripped) == 0:
                return
            elif len(stripped) > 1 and stripped[0:2] == "//":
                continue
            jsonValue = json.loads(stripped)
            if type(jsonValue) is list:
                writer.setArray(0, jsonValue)
            elif jsonValue is None:
                writer.setNull(0)
            else:
                writer.setRow(0, jsonValue)
            writer.next()