Python 示例:复杂类型的 JSON 解析器
以下示例详细说明了 UDParser,它接受 JSON 对象并将其解析为复杂类型。对于此示例,解析器假设输入数据是具有两个整数字段的行数组。输入记录应使用换行符进行分隔。如果 JSON 输入未指定任何行字段,则函数会将这些字段解析为 NULL。
此 UDParser 的源代码还包含一个工厂方法,用于解析具有整数和整数字段数组的行。解析器的实施与工厂中的返回类型无关,因此您可以创建具有不同返回类型且均指向 prepare()
方法中的 ComplexJsonParser()
类的工厂。完整的源代码位于 /opt/vertica/sdk/examples/python/UDParsers.py
中。
加载和使用示例
加载库并创建解析器,如下所示:
=> CREATE OR REPLACE LIBRARY UDParsers AS '/home/dbadmin/examples/python/UDParsers.py' LANGUAGE 'Python';
=> CREATE PARSER ComplexJsonParser AS LANGUAGE 'Python' NAME 'ArrayJsonParserFactory' LIBRARY UDParsers;
您现在可以定义一个表,然后使用 JSON 解析器将数据加载到其中,例如:
=> CREATE TABLE orders (a bool, arr array[row(a int, b int)]);
CREATE TABLE
=> COPY orders (arr) FROM STDIN WITH PARSER ComplexJsonParser();
[]
[{"a":1, "b":10}]
[{"a":1, "b":10}, {"a":null, "b":10}]
[{"a":1, "b":10},{"a":10, "b":20}]
[{"a":1, "b":10}, {"a":null, "b":null}]
[{"a":1, "b":2}, {"a":3, "b":4}, {"a":5, "b":6}, {"a":7, "b":8}, {"a":9, "b":10}, {"a":11, "b":12}, {"a":13, "b":14}]
\.
=> SELECT * FROM orders;
a | arr
--+--------------------------------------------------------------------------
| []
| [{"a":1,"b":10}]
| [{"a":1,"b":10},{"a":null,"b":10}]
| [{"a":1,"b":10},{"a":10,"b":20}]
| [{"a":1,"b":10},{"a":null,"b":null}]
| [{"a":1,"b":2},{"a":3,"b":4},{"a":5,"b":6},{"a":7,"b":8},{"a":9,"b":10},{"a":11,"b":12},{"a":13,"b":14}]
(6 rows)
设置
所有 Python UDx 都必须导入 Vertica SDK 库。 ComplexJsonParser()
也需要 json 库。
import vertica_sdk
import json
工厂实施
prepare()
方法将实例化并返回解析器:
def prepare(self, srvInterface, perColumnParamReader, planCtxt, returnType):
return ComplexJsonParser()
getParserReturnType()
声明返回类型必须是行数组,其中每个行都有两个整数字段:
def getParserReturnType(self, rvInterface, perColumnParamReader, planCtxt, argTypes, returnType):
fieldTypes = vertica_sdk.SizedColumnTypes.makeEmpty()
fieldTypes.addInt('a')
fieldTypes.addInt('b')
returnType.addArrayType(vertica_sdk.SizedColumnTypes.makeRowType(fieldTypes, 'elements'), 64, 'arr')
解析器实施
process()
方法会使用 InputBuffer
读入数据,然后在换行符处拆分该输入数据。随后,该方法会将处理后的数据传递给 writeRows()
方法。 writeRows()
会将每个数据行转换为 JSON 对象,检查该 JSON 对象的类型,然后将相应的值或对象写入输出。
class ComplexJsonParser(vertica_sdk.UDParser):
leftover = ''
def process(self, srvInterface, input_buffer, input_state, writer):
input_buffer.setEncoding('utf-8')
self.count = 0
rec = self.leftover + input_buffer.read()
row_lst = rec.split('\n')
self.leftover = row_lst[-1]
self.writeRows(row_lst[:-1], writer)
if input_state == InputState.END_OF_FILE:
self.writeRows([self.leftover], writer)
return StreamState.DONE
else:
return StreamState.INPUT_NEEDED
def writeRows(self, str_lst, writer):
for s in str_lst:
stripped = s.strip()
if len(stripped) == 0:
return
elif len(stripped) > 1 and stripped[0:2] == "//":
continue
jsonValue = json.loads(stripped)
if type(jsonValue) is list:
writer.setArray(0, jsonValue)
elif jsonValue is None:
writer.setNull(0)
else:
writer.setRow(0, jsonValue)
writer.next()