这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

用户定义的解析器

解析器将接收字节流,并将相应的元组序列传递到 Vertica 加载进程。您可以使用用户定义的解析器函数解析以下数据:

  • Vertica 内置解析器无法理解其格式的数据。

  • 所需的控制比内置解析器提供的控制更精确的数据。

例如,您可以使用特定的 CSV 库加载 CSV 文件。请参阅 Vertica SDK 查看两个 CSV 示例。

COPY 支持单个用户定义的解析器,您可以将其与用户定义的源以及零个或多个用户定义的筛选器实例一起使用。如果您实施 UDParser 类,还必须实施相应的 ParserFactory

有时可以通过添加块分割器来提高解析器的性能。块分割器可拆分输入并使用多个线程来解析输入。块分割器仅在 C++ API 中可用。有关详细信息,请参阅“协作解析”和“UDChunker 类”。在某些特殊情况下,您可以通过使用*分摊加载*进一步提高性能,当使用此方法时,输入由多个 Vertica 节点进行解析。

1 - UDParser 类

如果需要解析 COPY 语句的原生解析器无法处理的数据格式,您可以将 UDParser 类子类化。

在解析器执行期间,Vertica 会始终调用三个方法:setup()process()destroy()。它还可能调用 getRejectedRecord()

UDParser 构造函数

UDParser 类可执行对所有子类来说必不可少的初始化,包括初始化解析器使用的 StreamWriter 对象。因此,构造函数必须调用 super()

UDParser 方法

您的 UDParser 子类必须覆盖 process()processWithMetadata()

  • process() 将原始输入流作为一个大文件读取。如果有任何错误或故障,整个加载将失败。您可以实施具有源或筛选器(可实施 processWithMetadata())的 process(),但这可能会导致解析错误。
    您可以在上游源或筛选器实施 processWithMetadata() 时实施 process(),但可能会导致解析错误。

  • processWithMetadata() 当数据源具有以某种结构化格式(与数据有效负载分开)提供的关于记录边界的元数据时很有用。使用此接口,源除了发出数据之外,还会为每个记录发出记录长度。

    通过在每个阶段实施 processWithMetadata() 而不是 process(),您可以在整个加载堆栈中保留此记录长度元数据,从而实施更有效的解析,以在每个消息的基础上而不是每个文件或每个源的基础上从错误中恢复。 KafkaSource 和 Kafka 解析器(KafkaAvroParserKafkaJSONParserKafkaParser)会在各条 Kafka 消息损坏时使用这种机制为拒绝每条 Kafka 消息提供支持。

此外,您必须覆盖 getRejectedRecord() 以返回有关被拒绝记录的信息。

或者,您可以覆盖其他 UDParser 类方法。

解析器执行

以下各节详细说明了每次调用用户定义的解析器时的执行序列。以下示例覆盖 process() 方法。

设置

COPY 在第一次调用 process() 之前调用了 setup()。使用 setup() 可执行任何初始设置任务,以使解析器能够解析数据。此设置包括从类上下文结构检索参数,或初始化数据结构以在过滤期间使用。在第一次调用 process() 方法之前,Vertica 会先调用此方法。您的对象可能在使用期间已损坏并重新创建,因此请确保您的对象可以重新启动。

解析

COPY 将在查询执行期间重复调用 process()。Vertica 会向此方法传递数据缓冲区以解析为列和行以及由 InputState 定义的以下输入状态之一:

  • OK:当前在流的开头或中间

  • END_OF_FILE:无更多数据可用。

  • END_OF_CHUNK:当前数据在记录边界处结束且解析器应在返回之前用完所有数据。此输入状态仅在使用块分割器时出现。

  • START_OF_PORTION:输入的起始位置不是源的开头。解析器应查找第一个记录结束标记。此输入状态仅在使用分摊加载时出现。您可以使用 getPortion() 方法访问相应部分的偏移量和大小。

  • END_OF_PORTION:源已到达其部分的结尾。解析器应完成处理它开始的最后一条记录且不再前进。此输入状态仅在使用分摊加载时出现。

解析器必须拒绝其无法解析的任何数据,以便 Vertica 可以报告拒绝并将拒绝的数据写入到文件中。

process() 方法必须从输入缓冲区解析尽可能多的数据。输入缓冲区可能不在行边界结束。因此,该方法可能不得不在输入行的中间停止解析并请求更多数据。如果源文件包含 null 字节,则输入可以包含 null 字节,而且不会自动以 null 终止。

解析器具有关联的 StreamWriter 对象,数据写入实际上由此对象执行。当解析器提取列值后,它会对 StreamWriter 使用特定于类型的方法之一以将该值写入到输出流。有关这些方法的详细信息,请参阅写入数据

process() 的一次调用可能会写入多行数据。当解析器结束对数据行的处理后,它必须对 StreamWriter 调用 next() 以使输出流前进到新行。(通常,解析器会完成处理一行,因为它遇到了行尾标记。)

process() 方法到达缓冲区结尾时,它会通过返回由 StreamState 定义的以下值之一来告知 Vertica 其当前状态:

  • INPUT_NEEDED:解析器已到达缓冲区的结尾且需要获取更多可供解析的数据。

  • DONE:解析器已到达输入数据流的结尾。

  • REJECT:解析器已拒绝所读取的最后一行数据(请参阅拒绝行)。

分解

COPY 将在最后一次调用 process() 之后调用 destroy()。此方法可释放由 setup()process() 方法预留的任何资源。

process() 方法指示它已完成对数据源的解析之后,Vertica 将调用此方法。但是,有时可能会剩余尚未处理的数据源。在这种情况下,Vertica 可能会在稍后再次对该对象调用 setup(),并让此方法解析新的数据流中的数据。因此,请编写 destroy() 方法,以使 UDParser 子类的实例处于某种状态,从而可以安全地再次调用 setup()

报告拒绝

如果 process() 拒绝某行,Vertica 将调用 getRejectedRecord() 以报告该拒绝。通常,此方法将返回 RejectedRecord 类的实例和拒绝的行的详细信息。

写入数据

解析器具有关联的 StreamWriter 对象,您可以通过调用 getStreamWriter() 来访问此对象。在实施 process() 时,对 StreamWriter 对象使用 setType() 方法可将行中的值写入到特定的列索引。请验证写入的数据类型是否与架构所需的数据类型匹配。

以下示例显示了如何将类型为 long 的值写入到当前行中的第四列(索引 3):

StreamWriter writer = getStreamWriter();
...
writer.setLongValue(3, 98.6);

StreamWriter 为所有基本类型提供多种方法,例如 setBooleanValue()setStringValue() 等。有关 StreamWriter 方法的完整列表,包括使用基元类型的选项或将条目显式设置为 null 的选项,请参阅 API 文档。

拒绝行

如果解析器发现无法解析的数据,它应按照以下过程拒绝该行:

  1. 保存有关拒绝的行数据的详细信息和拒绝原因。这些信息片段可以直接存储在 RejectedRecord 对象中,或者也可以在需要使用之前将其存储在 UDParser 子类上的字段中。

  2. 通过更新 input.offset 来更新该行在输入缓冲区中的位置,以便可以从下一行继续解析。

  3. 通过返回值 StreamState.REJECT 来指示它已拒绝某个行。

  4. 返回 RejectedRecord 类的实例和有关拒绝的行的详细信息。

拆分大型负载

Vertica 提供了两种拆分大型负载的方法。 分摊加载 允许您在多个数据库节点之间分配负载。 协作解析 (仅限 C++)允许您在一个节点上的多个线程之间分配负载。

API

UDParser API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface, SizedColumnTypes &returnType);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input, InputState input_state)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &input,
                            LengthBuffer &input_lengths, InputState input_state)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface, SizedColumnTypes &returnType);

virtual RejectedRecord getRejectedRecord();

UDParser API 提供了以下通过子类扩展的方法:

public void setup(ServerInterface srvInterface, SizedColumnTypes returnType)
    throws UdfException;

public abstract StreamState process(ServerInterface srvInterface,
                DataBuffer input, InputState input_state)
    throws UdfException, DestroyInvocation;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface, SizedColumnTypes returnType)
    throws UdfException;

public RejectedRecord getRejectedRecord() throws UdfException;

UDParser 使用 StreamWriter 写入其输出。 StreamWriter 为所有基本类型提供多种方法,例如 setBooleanValue()setStringValue() 等。在 Java API 中,该类还提供可自动设置数据类型的 setValue() 方法。

前述方法可写入单个列值。 StreamWriter 还提供用于从映射写入完整行的方法。setRowFromMap() 方法使用列名称和值的映射,并将所有值写入到相应的列。此方法不定义新列,而是仅将值写入到现有列。JsonParser 示例使用此方法写入任意 JSON 输入。(请参阅Java 示例:JSON 解析器。)

setRowsFromMap() 还会使用提供的完整映射填充 Flex 表(请参阅 Flex 表)的任何 VMap ('raw') 列。在大多数情况下,setRowsFromMap() 是用于填充弹性表的适当方法。但是,您也可以使用 setVMap()(类似于其他 setValue() 方法)在指定的列中生成 VMap 值。

setRowFromMap() 方法可自动强制将输入值转换为使用关联的 TypeCoercion 为这些列定义的类型。在大多数情况下,使用默认实施 (StandardTypeCoercion) 都是合适的。

TypeCoercion 使用策略来控制其行为。例如,FAIL_INVALID_INPUT_VALUE 策略指示将无效输入视为错误,而非使用 null 值。系统会捕获错误,并将其作为拒绝进行处理(请参阅用户定义的解析器中的“拒绝行”)。策略还能控制是否截断太长的输入。对解析器的 TypeCoercion 使用 setPolicy() 方法可设置策略。有关支持的值,请参阅 API 文档。

除了设置这些策略之外,您可能还需要自定义类型强制转换。要执行此操作,请子类化提供的 TypeCoercion 实施之一,并覆盖 asType() 方法。如果解析器将读取来自第三方库的对象,则有必要进行此自定义。例如,用于处理地理坐标的解析器可以覆盖 asLong,以将诸如“40.4397N”等输入转换为数字。有关实施的列表,请参阅 Vertica API 文档。

UDParser API 提供了以下通过子类扩展的方法:


class PyUDParser(vertica_sdk.UDSource):
    def __init__(self):
        pass
    def setup(srvInterface, returnType):
        pass
    def process(self, srvInterface, inputbuffer, inputstate, streamwriter):
        # User implement process function.
        # User reads data from inputbuffer and parse the data.
        # Rows are emitted via the streamwriter argument
        return StreamState.DONE

在 Python 中,process() 方法需要输入缓冲区和输出缓冲区(请参阅 InputBuffer API 和 OutputBuffer API)。输入缓冲区表示您要解析的信息源。输出缓冲区会将筛选后的信息传递给 Vertica。

如果筛选器拒绝记录,请使用方法 REJECT() 来识别被拒绝的数据和拒绝原因。

2 - UDChunker 类

您可以子类化 UDChunker 类以允许解析器支持 协作解析。此类仅在 C++ API 中可用。

从根本上说,UDChunker 是一个非常简单的解析器。与 UDParser 一样,它包含以下三个方法:setup()process()destroy()。您必须覆盖 process(),而且可以覆盖其他方法。此类包含一个额外的方法 alignPortion(),您必须在要为 UDChunker 启用 分摊加载时实施该方法。

设置和分解

UDParser 一样,您可以为块分割器定义初始化和清理代码。Vertica 会在第一次调用 process() 之前先调用 setup(),并在最后一次调用 process() 之后才调用 destroy()。您的对象可能会在多个加载源之间重用,因此请确保 setup() 会完全初始化所有字段。

分块

Vertica 会调用 process() 以将输入拆分为可以独立解析的块。该方法会采用输入缓冲区和输入状态指示器:

  • OK:输入缓冲区从流的开头或中间位置开始。

  • END_OF_FILE:无更多数据可用。

  • END_OF_PORTION:源已到达其部分的结尾。此状态仅在使用分摊加载时出现。

如果输入状态为 END_OF_FILE,则块分割器应将 input.offset 标记设置为 input.size 并返回 DONE。返回 INPUT_NEEDED 是错误的。

如果输入状态为 OK,则块分割器应从输入缓冲区读取数据并找到记录边界。如果它找到至少一条记录的结尾,它应将 input.offset 标记与缓冲区中最后一条记录结尾之后的字节对齐并返回 CHUNK_ALIGNED。例如,如果输入是“abc~def”并且“~”是记录终止符,则此方法应将 input.offset 设置为 4,即“d”的位置。如果 process() 在没有找到记录边界的情况下到达输入的结尾,则应返回 INPUT_NEEDED

您可以将输入拆分成更小的块,但使用输入中的所有可用记录可以提高性能。例如,块分割器可以从输入的结尾向后扫描以找到记录终止符(它可能是输入中诸多记录的最后一条),并将其作为一个块全部返回,而不扫描剩余输入。

如果输入状态为 END_OF_PORTION,则块分割器的行为应与输入状态 OK 的行为相同,只不过它还应设置一个标记。再次调用时,它应在下一部分中找到第一条记录,并将块与该记录对齐。

输入数据可能包含 null 字节(如果源文件包含这种字节)。输入实参不会自动以 null 值终止。

process() 方法不得被无限期阻止。如果此方法在很长一段时间内无法继续执行,它应返回 KEEP_GOING。未能返回 KEEP_GOING 将导致多种后果,例如导致用户无法取消查询。

有关使用分块的 process() 方法的示例,请参阅 C++ 示例:分隔解析器和块分割器

对齐部分

如果块分割器支持分摊加载,请实施 alignPortion() 方法。在调用 process() 之前,Vertica 会调用此方法一次或多次,以将输入偏移量与该部分中第一个完整块的开头对齐。该方法会采用输入缓冲区和输入状态指示器:

  • START_OF_PORTION:缓冲区的开头对应于该部分的开头。可以使用 getPortion() 方法来访问该部分的偏移量和大小。

  • OK:输入缓冲区在相应部分的中间。

  • END_OF_PORTION:缓冲区的结尾对应于相应部分的结尾或超出相应部分的结尾。

  • END_OF_FILE:无更多数据可用。

该方法应从缓冲区的开头扫描到第一个完整记录的开头。它应该将 input.offset 设置为此位置并返回以下值之一:

  • DONE(如果找到块)。 input.offset 为块的第一个字节。

  • INPUT_NEEDED(如果输入缓冲区不包含任何块的开头)。从 END_OF_FILE 的输入状态返回此值是错误的。

  • REJECT(如果相应部分而非缓冲区不包含任何块的开头)。

API

UDChunker API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface,
            SizedColumnTypes &returnType);

virtual StreamState alignPortion(ServerInterface &srvInterface,
            DataBuffer &input, InputState state);

virtual StreamState process(ServerInterface &srvInterface,
            DataBuffer &input, InputState input_state)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface,
            SizedColumnTypes &returnType);

3 - ParserFactory 类

如果编写解析器,您还必须编写用于生成解析器实例的工厂。为此,请对 ParserFactory 类设置子类。

解析器工厂是单例。您的子类必须为无状态子类,没有包含数据的字段。子类还不得修改任何全局变量。

ParserFactory 类定义了以下方法。您的子类必须覆盖 prepare() 方法。可以覆盖其他方法。

设置

Vertica 将在启动程序节点上调用 plan() 一次,以执行下列任务:

  • 检查已从 COPY 语句中的函数调用传递的任何参数和错误消息(如果出现任何问题)。您通过从传递至 plan() 方法的 ServerInterface 的实例中获得 ParamReader 对象读取参数。

  • 存储各个主机解析数据所需的任何信息。例如,可以将参数存储在通过 planCtxt 参数传入的 PlanContext 实例中。plan() 方法仅在启动程序节点上运行,而 prepareUDSources() 方法则在每个读取数据源的主机上运行。因此,该对象是它们之间的唯一通信方式。

    您通过从 getWriter() 方法中获取 ParamWriter 对象在 PlanContext 中存储数据。然后,通过对 ParamWriter 调用方法(比如 setString)写入参数。

创建解析器

Vertica 将在每个节点上调用 prepare(),以使用由 plan() 方法存储的数据来创建并初始化解析器。

定义参数

实施 getParameterTypes() 可定义解析器所使用的参数的名称和类型。Vertica 使用此信息对调用人员发出有关未知或缺失参数的警告。Vertica 会忽略未知参数并为缺失参数使用默认值。当您需要为函数定义类型和参数时,您不需要覆盖此方法。

定义解析器输出

实施 getParserReturnType() 可定义解析器输出的表列的数据类型。如果适用,getParserReturnType() 还可以定义数据类型的大小、精度和小数位数。通常,此方法从 argTypeperColumnParamReader 参数读取输出表的数据类型,并验证是否能够输出相应的数据类型。如果 getParserReturnType() 已准备好输出数据类型,则它会对 returnType 参数中传递的 SizedColumnTypes 对象调用方法。除了输出列的数据类型之外,方法还应指定有关列的数据类型的任何附加信息:

  • 对于二进制和字符串数据类型(例如,CHAR、VARCHAR 和 LONG VARBINARY),请指定其最大长度。

  • 对于 NUMERIC 类型,请指定其精度和小数位数。

  • 对于 Time/Timestamp 类型(无论是否带有时区),请指定其精度(-1 表示未指定)。

  • 对于 ARRAY 类型,请指定元素的最大数量。

  • 对于所有其他类型,无需指定长度或精度。

支持协作解析

要支持协作解析,请实施 prepareChunker() 并返回 UDChunker 子类的实例。如果 isChunkerApportionable() 返回 true,则此方法返回 null 是错误的。

目前仅在 C++ API 中支持协作解析。

支持分摊加载

要支持分摊加载,解析器、块分割器或这两者都必须支持分摊。要指示解析器可以分摊加载,请实施 isParserApportionable() 并返回 true。要指示块分割器可以分摊加载,请实施 isChunkerApportionable() 并返回 true

isChunkerApportionable() 方法会将 ServerInterface 作为实参,因此您可以访问 COPY 语句中提供的参数。例如,如果用户可以指定记录分隔符,您可能需要此类信息。当且仅当工厂可以为此输入创建块分割器时,此方法将返回 true

API

ParserFactory API 提供了以下通过子类扩展的方法:

virtual void plan(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader, PlanContext &planCtxt);

virtual UDParser * prepare(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt, const SizedColumnTypes &returnType)=0;

virtual void getParameterType(ServerInterface &srvInterface, SizedColumnTypes &parameterTypes);

virtual void getParserReturnType(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt, const SizedColumnTypes &argTypes,
            SizedColumnTypes &returnType);

virtual bool isParserApportionable();

// C++ API only:
virtual bool isChunkerApportionable(ServerInterface &srvInterface);

virtual UDChunker * prepareChunker(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt, const SizedColumnTypes &returnType);

如果要使用 分摊加载 将单个输入拆分成多个加载流,请实施 isParserApportionable() 和/或 isChunkerApportionable() 并返回 true。即使这些方法返回 true,也不能保证 Vertica 分摊加载。然而,如果这两个方法均返回 false,则表示解析器不会尝试这么做。

如果要使用 协作解析,请实施 prepareChunker() 并返回 UDChunker 子类的实例。仅 C++ API 支持协作解析。

Vertica 将为非隔离功能调用 prepareChunker() 方法。在隔离模式下使用函数时,此方法不可用。

如果您希望块分割器可用于分摊加载,请实施 isChunkerApportionable() 并返回 true

创建 ParserFactory 之后,您必须将其注册到 RegisterFactory 宏。

ParserFactory API 提供了以下通过子类扩展的方法:

public void plan(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader, PlanContext planCtxt)
    throws UdfException;

public abstract UDParser prepare(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt, SizedColumnTypes returnType)
    throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

public void getParserReturnType(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt, SizedColumnTypes argTypes, SizedColumnTypes returnType)
    throws UdfException;

ParserFactory API 提供了以下通过子类扩展的方法:

class PyParserFactory(vertica_sdk.SourceFactory):
    def __init__(self):
        pass
    def plan(self):
        pass
    def prepareUDSources(self, srvInterface):
        # User implement the function to create PyUDParser.
        pass

4 - C++ 示例: BasicIntegerParser

BasicIntegerParser 示例解析由非数字字符分隔的整数字符串。有关使用持续加载的此解析器的版本,请参阅 C++ 示例: ContinuousIntegerParser

加载和使用示例

按如下所示加载并使用 BasicIntegerParser 示例。

=> CREATE LIBRARY BasicIntegerParserLib AS '/home/dbadmin/BIP.so';

=> CREATE PARSER BasicIntegerParser AS
LANGUAGE 'C++' NAME 'BasicIntegerParserFactory' LIBRARY BasicIntegerParserLib;

=> CREATE TABLE t (i integer);

=> COPY t FROM stdin WITH PARSER BasicIntegerParser();
0
1
2
3
4
5
\.

实施

BasicIntegerParser 类仅实施 API 中的 process() 方法。(它还实施了一个用于类型转换的 helper 方法。)此方法处理每一行输入,查找每一行的数字。当它前进到新行时,它会移动 input.offset 标记并检查输入状态。然后它写入输出。

    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input,
                InputState input_state) {
        // WARNING: This implementation is not trying for efficiency.
        // It is trying for simplicity, for demonstration purposes.

        size_t start = input.offset;
        const size_t end = input.size;

        do {
            bool found_newline = false;
            size_t numEnd = start;
            for (; numEnd < end; numEnd++) {
                if (input.buf[numEnd] < '0' || input.buf[numEnd] > '9') {
                    found_newline = true;
                    break;
                }
            }

            if (!found_newline) {
                input.offset = start;
                if (input_state == END_OF_FILE) {
                    // If we're at end-of-file,
                    // emit the last integer (if any) and return DONE.
                    if (start != end) {
                        writer->setInt(0, strToInt(input.buf + start, input.buf + numEnd));
                        writer->next();
                    }
                    return DONE;
                } else {
                    // Otherwise, we need more data.
                    return INPUT_NEEDED;
                }
            }

            writer->setInt(0, strToInt(input.buf + start, input.buf + numEnd));
            writer->next();

            start = numEnd + 1;
        } while (true);
    }
};

在工厂中,plan() 方法是无操作的;没有要检查的参数。prepare() 方法使用 SDK 提供的宏实例化解析器:

    virtual UDParser* prepare(ServerInterface &srvInterface,
            PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt,
            const SizedColumnTypes &returnType) {

        return vt_createFuncObject<BasicIntegerParser>(srvInterface.allocator);
    }

getParserReturnType() 方法声明了单个输出:

    virtual void getParserReturnType(ServerInterface &srvInterface,
            PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt,
            const SizedColumnTypes &argTypes,
            SizedColumnTypes &returnType) {
        // We only and always have a single integer column
        returnType.addInt(argTypes.getColumnName(0));
    }

对于所有用 C++ 编写的 UDx,该示例以注册其工厂结束:

RegisterFactory(BasicIntegerParserFactory);

5 - C++ 示例: ContinuousIntegerParser

ContinuousIntegerParser 示例是 BasicIntegerParser 的变体。这两个示例都从输入字符串中解析整数。 ContinuousIntegerParser 使用 连续加载 读取数据。

加载和使用示例

按如下所示加载 ContinuousIntegerParser 示例。

=> CREATE LIBRARY ContinuousIntegerParserLib AS '/home/dbadmin/CIP.so';

=> CREATE PARSER ContinuousIntegerParser AS
LANGUAGE 'C++' NAME 'ContinuousIntegerParserFactory'
LIBRARY ContinuousIntegerParserLib;

以使用 BasicIntegerParser 的同样方式使用它。请参阅C++ 示例: BasicIntegerParser

实施

ContinuousIntegerParserContinuousUDParser 的子类。ContinuousUDParser 的子类将处理逻辑放在 run() 方法中。

    virtual void run() {

        // This parser assumes a single-column input, and
        // a stream of ASCII integers split by non-numeric characters.
        size_t pos = 0;
        size_t reserved = cr.reserve(pos+1);
        while (!cr.isEof() || reserved == pos + 1) {
            while (reserved == pos + 1 && isdigit(*ptr(pos))) {
                pos++;
                reserved = cr.reserve(pos + 1);
            }

            std::string st(ptr(), pos);
            writer->setInt(0, strToInt(st));
            writer->next();

            while (reserved == pos + 1 && !isdigit(*ptr(pos))) {
                pos++;
                reserved = cr.reserve(pos + 1);
            }
            cr.seek(pos);
            pos = 0;
            reserved = cr.reserve(pos + 1);
        }
    }
};

有关 ContinuousUDParser 的更复杂示例,请参阅示例中的 ExampleDelimitedParser。(请参阅下载并运行 UDx 示例代码。) ExampleDelimitedParser 使用块分割器;请参阅 C++ 示例:分隔解析器和块分割器

6 - Java 示例:数字文本

NumericTextParser 示例可解析以单词而非数字表示的整数值(例如,"one two three" 代表一百二十三)。解析器将执行下列操作:

  • 接受单个参数,以设置用来分隔数据行中各列的字符。分隔符默认设置为管道 (|) 字符。

  • 忽略额外空格和用于表示数字的单词的首字母大写。

  • 使用以下单词标识数字:zero、one、two、three、four、five、six、seven、eight 和 nine。

  • 假设表示整数的单词至少用一个空格分隔。

  • 拒绝无法完全解析为整数的任何数据行。

  • 在输出表包含非整数列时生成错误。

加载和使用示例

按如下所示加载并使用解析器:

=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaLib.jar' LANGUAGE 'JAVA';
CREATE LIBRARY

=> CREATE PARSER NumericTextParser AS LANGUAGE 'java'
->    NAME 'com.myCompany.UDParser.NumericTextParserFactory'
->    LIBRARY JavaLib;
CREATE PARSER FUNCTION
=> CREATE TABLE t (i INTEGER);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One
>> Two
>> One Two Three
>> \.
=> SELECT * FROM t ORDER BY i;
  i
-----
   1
   2
 123
(3 rows)

=> DROP TABLE t;
DROP TABLE
=> -- Parse multi-column input
=> CREATE TABLE t (i INTEGER, j INTEGER);
CREATE TABLE
=> COPY t FROM stdin WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One | Two
>> Two | Three
>> One Two Three | four Five Six
>> \.
=> SELECT * FROM t ORDER BY i;
  i  |  j
-----+-----
   1 |   2
   2 |   3
 123 | 456
(3 rows)

=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> -- Use alternate separator character
=> COPY t FROM STDIN WITH PARSER NumericTextParser(separator='*');
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> Five * Six
>> seven * eight
>> nine * one zero
>> \.
=> SELECT * FROM t ORDER BY i;
 i | j
---+----
 5 |  6
 7 |  8
 9 | 10
(3 rows)

=> TRUNCATE TABLE t;
TRUNCATE TABLE

=> -- Rows containing data that does not parse into digits is rejected.
=> DROP TABLE t;
DROP TABLE
=> CREATE TABLE t (i INTEGER);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One Zero Zero
>> Two Zero Zero
>> Three Zed Zed
>> Four Zero Zero
>> Five Zed Zed
>> \.
SELECT * FROM t ORDER BY i;
  i
-----
 100
 200
 400
(3 rows)

=> -- Generate an error by trying to copy into a table with a non-integer column
=> DROP TABLE t;
DROP TABLE
=> CREATE TABLE t (i INTEGER, j VARCHAR);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
vsql:UDParse.sql:94: ERROR 3399:  Failure in UDx RPC call
InvokeGetReturnTypeParser(): Error in User Defined Object [NumericTextParser],
error code: 0
com.vertica.sdk.UdfException: Column 2 of output table is not an Int
        at com.myCompany.UDParser.NumericTextParserFactory.getParserReturnType
        (NumericTextParserFactory.java:70)
        at com.vertica.udxfence.UDxExecContext.getReturnTypeParser(
        UDxExecContext.java:1464)
        at com.vertica.udxfence.UDxExecContext.getReturnTypeParser(
        UDxExecContext.java:768)
        at com.vertica.udxfence.UDxExecContext.run(UDxExecContext.java:236)
        at java.lang.Thread.run(Thread.java:662)

解析器实施

以下代码可实施解析器。

package com.myCompany.UDParser;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.DestroyInvocation;
import com.vertica.sdk.RejectedRecord;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.InputState;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.StreamWriter;
import com.vertica.sdk.UDParser;
import com.vertica.sdk.UdfException;

public class NumericTextParser extends UDParser {

    private String separator; // Holds column separator character

    // List of strings that we accept as digits.
    private List<String> numbers = Arrays.asList("zero", "one",
            "two", "three", "four", "five", "six", "seven",
            "eight", "nine");

    // Hold information about the last rejected row.
    private String rejectedReason;
    private String rejectedRow;

    // Constructor gets the separator character from the Factory's prepare()
    // method.
    public NumericTextParser(String sepparam) {
        super();
        this.separator = sepparam;
    }

    // Called to perform the actual work of parsing. Gets a buffer of bytes
    // to turn into tuples.
    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer input,
            InputState input_state) throws UdfException, DestroyInvocation {

        int i=input.offset; // Current position in the input buffer
        // Flag to indicate whether we just found the end of a row.
        boolean lastCharNewline = false;
        // Buffer to hold the row of data being read.
        StringBuffer line = new StringBuffer();

        //Continue reading until end of buffer.
        for(; i < input.buf.length; i++){
            // Loop through input until we find a linebreak: marks end of row
            char inchar = (char) input.buf[i];
            // Note that this isn't a robust way to find rows. It should
            // accept a user-defined row separator. Also, the following
            // assumes ASCII line break metheods, which isn't a good idea
            // in the UTF world. But it is good enough for this simple example.
            if (inchar != '\n' && inchar != '\r') {
                // Keep adding to a line buffer until a full row of data is read
                line.append(inchar);
                lastCharNewline = false; // Last character not a new line
            } else {
                // Found a line break. Process the row.
                lastCharNewline = true; // indicate we got a complete row
                // Update the position in the input buffer. This is updated
                // whether the row is successfully processed or not.
                input.offset = i+1;
                // Call procesRow to extract values and write tuples to the
                // output. Returns false if there was an error.
                if (!processRow(line)) {
                    // Processing row failed. Save bad row to rejectedRow field
                    // and return to caller indicating a rejected row.
                    rejectedRow = line.toString();
                    // Update position where we processed the data.
                    return StreamState.REJECT;
                }
                line.delete(0, line.length()); // clear row buffer
            }
        }

        // At this point, process() has finished processing the input buffer.
        // There are two possibilities: need to get more data
        // from the input stream to finish processing, or there is
        // no more data to process. If at the end of the input stream and
        // the row was not terminated by a linefeed, it may need
        // to process the last row.

        if (input_state == InputState.END_OF_FILE && lastCharNewline) {
            // End of input and it ended on a newline. Nothing more to do
            return StreamState.DONE;
        } else if (input_state == InputState.END_OF_FILE && !lastCharNewline) {
            // At end of input stream but didn't get a final newline. Need to
            // process the final row that was read in, then exit for good.
            if (line.length() == 0) {
                // Nothing to process. Done parsing.
                return StreamState.DONE;
            }
            // Need to parse the last row, not terminated by a linefeed. This
            // can occur if the file being read didn't have a final line break.
            if (processRow(line)) {
                return StreamState.DONE;
            } else {
                // Processing last row failed. Save bad row to rejectedRow field
                // and return to caller indicating a rejected row.
                rejectedRow = line.toString();
                // Tell Vertica the entire buffer was processed so it won't
                // call again to have the line processed.
                input.offset = input.buf.length;
                return StreamState.REJECT;
            }
        } else {
            // Stream is not fully read, so tell Vertica to send more. If
            // process() didn't get a complete row before it hit the end of the
            // input buffer, it will end up re-processing that segment again
            // when more data is added to the buffer.
            return StreamState.INPUT_NEEDED;
        }
    }

    // Breaks a row into columns, then parses the content of the
    // columns. Returns false if there was an error parsing the
    // row, in which case it sets the rejected row to the input
    // line. Returns true if the row was successfully read.
    private boolean processRow(StringBuffer line)
                                throws UdfException, DestroyInvocation {
        String[] columns = line.toString().split(Pattern.quote(separator));
        // Loop through the columns, decoding their contents
        for (int col = 0; col < columns.length; col++) {
            // Call decodeColumn to extract value from this column
            Integer colval = decodeColumn(columns[col]);
            if (colval == null) {
                // Could not parse one of the columns. Indicate row should
                // be rejected.
                return false;
            }
            // Column parsed OK. Write it to the output. writer is a field
            // provided by the parent class. Since this parser only accepts
            // integers, there is no need to verify that data type of the parsed
            // data matches the data type of the column being written. In your
            // UDParsers, you may want to perform this verification.
            writer.setLong(col,colval);
        }
        // Done with the row of data. Advance output to next row.

        // Note that this example does not verify that all of the output columns
        // have values assigned to them. If there are missing values at the
        // end of a row, they get automatically get assigned a default value
        // (0 for integers). This isn't a robust solution. Your UDParser
        // should perform checks here to handle this situation and set values
        // (such as null) when appropriate.
        writer.next();
        return true; // Successfully processed the row.
    }

    // Gets a string with text numerals, i.e. "One Two Five Seven" and turns
    // it into an integer value, i.e. 1257. Returns null if the string could not
    // be parsed completely into numbers.
    private Integer decodeColumn(String text) {
        int value = 0; // Hold the value being parsed.

        // Split string into individual words. Eat extra spaces.
        String[] words = text.toLowerCase().trim().split("\\s+");

        // Loop through the words, matching them against the list of
        // digit strings.
        for (int i = 0; i < words.length; i++) {
            if (numbers.contains(words[i])) {
                // Matched a digit. Add the it to the value.
                int digit = numbers.indexOf(words[i]);
                value = (value * 10) + digit;
            } else {
                // The string didn't match one of the accepted string values
                // for digits. Need to reject the row. Set the rejected
                // reason string here so it can be incorporated into the
                // rejected reason object.
                //
                // Note that this example does not handle null column values.
                // In most cases, you want to differentiate between an
                // unparseable column value and a missing piece of input
                // data. This example just rejects the row if there is a missing
                // column value.
                rejectedReason = String.format(
                        "Could not parse '%s' into a digit",words[i]);
                return null;
            }
        }
        return value;
    }

    // Vertica calls this method if the parser rejected a row of data
    // to find out what went wrong and add to the proper logs. Just gathers
    // information stored in fields and returns it in an object.
    @Override
    public RejectedRecord getRejectedRecord() throws UdfException {
        return new RejectedRecord(rejectedReason,rejectedRow.toCharArray(),
                rejectedRow.length(), "\n");
    }
}

ParserFactory 实施

以下代码可实施解析器工厂。

NumericTextParser 接受名为 separator 的单个可选参数。您可以在 getParameterType() 方法中定义此参数,而 plan() 方法可存储此参数的值。 NumericTextParser 仅输出整数值。因此,如果输出表包含数据类型是非整数的列,getParserReturnType() 方法将抛出异常。

package com.myCompany.UDParser;

import java.util.regex.Pattern;

import com.vertica.sdk.ParamReader;
import com.vertica.sdk.ParamWriter;
import com.vertica.sdk.ParserFactory;
import com.vertica.sdk.PerColumnParamReader;
import com.vertica.sdk.PlanContext;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.UDParser;
import com.vertica.sdk.UdfException;
import com.vertica.sdk.VerticaType;

public class NumericTextParserFactory extends ParserFactory {

    // Called once on the initiator host to check the parameters and set up the
    // context data that hosts performing processing will need later.
    @Override
    public void plan(ServerInterface srvInterface,
                PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt) {

        String separator = "|"; // assume separator is pipe character

        // See if a parameter was given for column separator
        ParamReader args = srvInterface.getParamReader();
        if (args.containsParameter("separator")) {
            separator = args.getString("separator");
            if (separator.length() > 1) {
                throw new UdfException(0,
                        "Separator parameter must be a single character");
            }
            if (Pattern.quote(separator).matches("[a-zA-Z]")) {
                throw new UdfException(0,
                        "Separator parameter cannot be a letter");
            }
        }

        // Save separator character in the Plan Data
        ParamWriter context = planCtxt.getWriter();
        context.setString("separator", separator);
    }

    // Define the data types of the output table that the parser will return.
    // Mainly, this just ensures that all of the columns in the table which
    // is the target of the data load are integer.
    @Override
    public void getParserReturnType(ServerInterface srvInterface,
                PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt,
                SizedColumnTypes argTypes,
                SizedColumnTypes returnType) {

        // Get access to the output table's columns
        for (int i = 0; i < argTypes.getColumnCount(); i++ ) {
            if (argTypes.getColumnType(i).isInt()) {
                // Column is integer... add it to the output
                 returnType.addInt(argTypes.getColumnName(i));
            } else {
                // Column isn't an int, so throw an exception.
                // Technically, not necessary since the
                // UDx framework will automatically error out when it sees a
                // Discrepancy between the type in the target table and the
                // types declared by this method. Throwing this exception will
                // provide a clearer error message to the user.
                String message = String.format(
                    "Column %d of output table is not an Int", i + 1);
                throw new UdfException(0, message);
            }
        }
    }

    // Instantiate the UDParser subclass named NumericTextParser. Passes the
    // separator characetr as a paramter to the constructor.
    @Override
    public UDParser prepare(ServerInterface srvInterface,
            PerColumnParamReader perColumnParamReader, PlanContext planCtxt,
            SizedColumnTypes returnType) throws UdfException {
        // Get the separator character from the context
        String separator = planCtxt.getReader().getString("separator");
        return new NumericTextParser(separator);
    }

    // Describe the parameters accepted by this parser.
    @Override
    public void getParameterType(ServerInterface srvInterface,
             SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(1, "separator");
    }
}

7 - Java 示例:JSON 解析器

JSON 解析器会使用 JSON 对象流。每个对象都必须具有正确格式,并且必须位于输入中的一行上。可以使用换行符分隔对象。此解析器使用字段名称作为映射中的键,而这些键将成为表中的列名称。可以在 /opt/vertica/packages/flextable/examples 中找到此示例的代码。此目录还包含一个示例数据文件。

此示例使用 setRowFromMap() 方法写入数据。

加载和使用示例

使用第三方库 (gson-2.2.4.jar) 加载库并定义 JSON 解析器,如下所示。有关下载 URL,请参阅 JsonParser.java 中的注释:

=> CREATE LIBRARY json
-> AS '/opt/vertica/packages/flextable/examples/java/output/json.jar'
-> DEPENDS '/opt/vertica/bin/gson-2.2.4.jar' language 'java';
CREATE LIBRARY

=> CREATE PARSER JsonParser AS LANGUAGE 'java'
-> NAME 'com.vertica.flex.JsonParserFactory' LIBRARY json;
CREATE PARSER FUNCTION

您现在可以定义一个表,然后使用 JSON 解析器将数据加载到其中,如下所示:

=> CREATE TABLE mountains(name varchar(64), type varchar(32), height integer);
CREATE TABLE

=> COPY mountains FROM '/opt/vertica/packages/flextable/examples/mountains.json'
-> WITH PARSER JsonParser();
-[ RECORD 1 ]--
Rows Loaded | 2

=> SELECT * from mountains;
-[ RECORD 1 ]--------
name   | Everest
type   | mountain
height | 29029
-[ RECORD 2 ]--------
name   | Mt St Helens
type   | volcano
height |

该数据文件包含一个未加载的值 (hike_safety),因为表定义不包括该列。该数据文件遵循以下格式:

{ "name": "Everest", "type":"mountain", "height": 29029, "hike_safety": 34.1  }
{ "name": "Mt St Helens", "type": "volcano", "hike_safety": 15.4 }

实施

以下代码显示了 JsonParser.java 中的 process() 方法。解析器会尝试将输入读取到一个 Map. 中。如果读取成功,JSON 解析器将调用 setRowFromMap()

    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer input,
            InputState inputState) throws UdfException, DestroyInvocation {
        clearReject();
        StreamWriter output = getStreamWriter();

        while (input.offset < input.buf.length) {
            ByteBuffer lineBytes = consumeNextLine(input, inputState);

            if (lineBytes == null) {
                return StreamState.INPUT_NEEDED;
            }

            String lineString = StringUtils.newString(lineBytes);

            try {
                Map map = gson.fromJson(lineString, parseType);

                if (map == null) {
                    continue;
                }

                output.setRowFromMap(map);
                // No overrides needed, so just call next() here.
         output.next();
            } catch (Exception ex) {
                setReject(lineString, ex);
                return StreamState.REJECT;
            }
        }

JsonParserFactory.java 工厂会将解析器实例化并在 prepare() 方法中返回该解析器。您不需要执行其他设置。

8 - C++ 示例:分隔解析器和块分割器

ExampleDelimitedUDChunker 类在分隔符处划分输入。您可以将此块分割器与任何理解分隔输入的解析器一起使用。 ExampleDelimitedParser 是一个使用这个块分割器的 ContinuousUDParser 子类。

加载和使用示例

按如下所示加载并使用示例。

=> CREATE LIBRARY ExampleDelimitedParserLib AS '/home/dbadmin/EDP.so';

=> CREATE PARSER ExampleDelimitedParser AS
    LANGUAGE 'C++' NAME 'DelimitedParserFrameworkExampleFactory'
    LIBRARY ExampleDelimitedParserLib;

=> COPY t FROM stdin WITH PARSER ExampleDelimitedParser();
0
1
2
3
4
5
6
7
8
9
\.

块分割器实施

该块分割器支持分摊加载。alignPortion() 方法在当前部分找到第一个完整记录的开头,并将输入缓冲区与其对齐。记录终止符作为实参传递并在构造函数中设置。

StreamState ExampleDelimitedUDChunker::alignPortion(
            ServerInterface &srvInterface,
            DataBuffer &input, InputState state)
{
    /* find the first record terminator.  Its record belongs to the previous portion */
    void *buf = reinterpret_cast<void *>(input.buf + input.offset);
    void *term = memchr(buf, recordTerminator, input.size - input.offset);

    if (term) {
        /* record boundary found.  Align to the start of the next record */
        const size_t chunkSize = reinterpret_cast<size_t>(term) - reinterpret_cast<size_t>(buf);
        input.offset += chunkSize
            + sizeof(char) /* length of record terminator */;

        /* input.offset points at the start of the first complete record in the portion */
        return DONE;
    } else if (state == END_OF_FILE || state == END_OF_PORTION) {
        return REJECT;
    } else {
        VIAssert(state == START_OF_PORTION || state == OK);
        return INPUT_NEEDED;
    }
}

process() 方法必须考虑跨越部分边界的块。如果之前的调用在某个部分的末尾,则该方法设置一个标志。代码首先检查并处理该条件。逻辑与 alignPortion() 类似,所以示例调用它来做部分除法。

StreamState ExampleDelimitedUDChunker::process(
                ServerInterface &srvInterface,
                DataBuffer &input,
                InputState input_state)
{
    const size_t termLen = 1;
    const char *terminator = &recordTerminator;

    if (pastPortion) {
        /*
         * Previous state was END_OF_PORTION, and the last chunk we will produce
         * extends beyond the portion we started with, into the next portion.
         * To be consistent with alignPortion(), that means finding the first
         * record boundary, and setting the chunk to be at that boundary.
         * Fortunately, this logic is identical to aligning the portion (with
         * some slight accounting for END_OF_FILE)!
         */
        const StreamState findLastTerminator = alignPortion(srvInterface, input);

        switch (findLastTerminator) {
            case DONE:
                return DONE;
            case INPUT_NEEDED:
                if (input_state == END_OF_FILE) {
                    /* there is no more input where we might find a record terminator */
                    input.offset = input.size;
                    return DONE;
                }
                return INPUT_NEEDED;
            default:
                VIAssert("Invalid return state from alignPortion()");
        }
        return findLastTerminator;
    }

现在,该方法查找分隔符。如果输入从一个部分的末尾开始,它会设置标志。

    size_t ret = input.offset, term_index = 0;
    for (size_t index = input.offset; index < input.size; ++index) {
        const char c = input.buf[index];
        if (c == terminator[term_index]) {
            ++term_index;
            if (term_index == termLen) {
                ret = index + 1;
                term_index = 0;
            }
            continue;
        } else if (term_index > 0) {
            index -= term_index;
        }

        term_index = 0;
    }

    if (input_state == END_OF_PORTION) {
        /*
         * Regardless of whether or not a record was found, the next chunk will extend
         * into the next portion.
         */
        pastPortion = true;
    }

最后,process() 移动输入偏移量并返回。

    // if we were able to find some rows, move the offset to point at the start of the next (potential) row, or end of block
    if (ret > input.offset) {
        input.offset = ret;
        return CHUNK_ALIGNED;
    }

    if (input_state == END_OF_FILE) {
        input.offset = input.size;
        return DONE;
    }

    return INPUT_NEEDED;
}

工厂实施

文件 ExampleDelimitedParser.cpp 定义了一个使用此 UDChunker 的工厂。块分割器支持分摊加载,因此工厂实施 isChunkerApportionable()

    virtual bool isChunkerApportionable(ServerInterface &srvInterface) {
        ParamReader params = srvInterface.getParamReader();
        if (params.containsParameter("disable_chunker") && params.getBoolRef("d\
isable_chunker")) {
            return false;
        } else {
            return true;
        }
    }

prepareChunker() 方法创建块分割器:

    virtual UDChunker* prepareChunker(ServerInterface &srvInterface,
                                      PerColumnParamReader &perColumnParamReade\
r,
                                      PlanContext &planCtxt,
                                      const SizedColumnTypes &returnType)
    {
        ParamReader params = srvInterface.getParamReader();
        if (params.containsParameter("disable_chunker") && params.getBoolRef("d\
isable_chunker")) {
            return NULL;
        }

        std::string recordTerminator("\n");

        ParamReader args(srvInterface.getParamReader());
        if (args.containsParameter("record_terminator")) {
            recordTerminator = args.getStringRef("record_terminator").str();
        }

        return vt_createFuncObject<ExampleDelimitedUDChunker>(srvInterface.allo\
cator,
                recordTerminator[0]);
    }

9 - Python 示例:复杂类型的 JSON 解析器

以下示例详细说明了 UDParser,它接受 JSON 对象并将其解析为复杂类型。对于此示例,解析器假设输入数据是具有两个整数字段的行数组。输入记录应使用换行符进行分隔。如果 JSON 输入未指定任何行字段,则函数会将这些字段解析为 NULL。

此 UDParser 的源代码还包含一个工厂方法,用于解析具有整数和整数字段数组的行。解析器的实施与工厂中的返回类型无关,因此您可以创建具有不同返回类型且均指向 prepare() 方法中的 ComplexJsonParser() 类的工厂。完整的源代码位于 /opt/vertica/sdk/examples/python/UDParsers.py 中。

加载和使用示例

加载库并创建解析器,如下所示:


=> CREATE OR REPLACE LIBRARY UDParsers AS '/home/dbadmin/examples/python/UDParsers.py' LANGUAGE 'Python';

=> CREATE PARSER ComplexJsonParser AS LANGUAGE 'Python' NAME 'ArrayJsonParserFactory' LIBRARY UDParsers;

您现在可以定义一个表,然后使用 JSON 解析器将数据加载到其中,例如:


=> CREATE TABLE orders (a bool, arr array[row(a int, b int)]);
CREATE TABLE

=> COPY orders (arr) FROM STDIN WITH PARSER ComplexJsonParser();
[]
[{"a":1, "b":10}]
[{"a":1, "b":10}, {"a":null, "b":10}]
[{"a":1, "b":10},{"a":10, "b":20}]
[{"a":1, "b":10}, {"a":null, "b":null}]
[{"a":1, "b":2}, {"a":3, "b":4}, {"a":5, "b":6}, {"a":7, "b":8}, {"a":9, "b":10}, {"a":11, "b":12}, {"a":13, "b":14}]
\.

=> SELECT * FROM orders;
a |                                  arr
--+--------------------------------------------------------------------------
  | []
  | [{"a":1,"b":10}]
  | [{"a":1,"b":10},{"a":null,"b":10}]
  | [{"a":1,"b":10},{"a":10,"b":20}]
  | [{"a":1,"b":10},{"a":null,"b":null}]
  | [{"a":1,"b":2},{"a":3,"b":4},{"a":5,"b":6},{"a":7,"b":8},{"a":9,"b":10},{"a":11,"b":12},{"a":13,"b":14}]
(6 rows)

设置

所有 Python UDx 都必须导入 Vertica SDK 库。 ComplexJsonParser() 也需要 json 库。

import vertica_sdk
import json

工厂实施

prepare() 方法将实例化并返回解析器:


def prepare(self, srvInterface, perColumnParamReader, planCtxt, returnType):
    return ComplexJsonParser()

getParserReturnType() 声明返回类型必须是行数组,其中每个行都有两个整数字段:


def getParserReturnType(self, rvInterface, perColumnParamReader, planCtxt, argTypes, returnType):
    fieldTypes = vertica_sdk.SizedColumnTypes.makeEmpty()
    fieldTypes.addInt('a')
    fieldTypes.addInt('b')
    returnType.addArrayType(vertica_sdk.SizedColumnTypes.makeRowType(fieldTypes, 'elements'), 64, 'arr')

解析器实施

process() 方法会使用 InputBuffer 读入数据,然后在换行符处拆分该输入数据。随后,该方法会将处理后的数据传递给 writeRows() 方法。 writeRows() 会将每个数据行转换为 JSON 对象,检查该 JSON 对象的类型,然后将相应的值或对象写入输出。


class ComplexJsonParser(vertica_sdk.UDParser):

    leftover = ''

    def process(self, srvInterface, input_buffer, input_state, writer):
        input_buffer.setEncoding('utf-8')

        self.count = 0
        rec = self.leftover + input_buffer.read()
        row_lst = rec.split('\n')
        self.leftover = row_lst[-1]
        self.writeRows(row_lst[:-1], writer)
        if input_state == InputState.END_OF_FILE:
            self.writeRows([self.leftover], writer)
            return StreamState.DONE
        else:
            return StreamState.INPUT_NEEDED

    def writeRows(self, str_lst, writer):
        for s in str_lst:
            stripped = s.strip()
            if len(stripped) == 0:
                return
            elif len(stripped) > 1 and stripped[0:2] == "//":
                continue
            jsonValue = json.loads(stripped)
            if type(jsonValue) is list:
                writer.setArray(0, jsonValue)
            elif jsonValue is None:
                writer.setNull(0)
            else:
                writer.setRow(0, jsonValue)
            writer.next()