UDParser 类
如果需要解析 COPY 语句的原生解析器无法处理的数据格式,您可以将 UDParser
类子类化。
在解析器执行期间,Vertica 会始终调用三个方法:setup()
、process()
和 destroy()
。它还可能调用 getRejectedRecord()
。
UDParser 构造函数
UDParser
类可执行对所有子类来说必不可少的初始化,包括初始化解析器使用的 StreamWriter
对象。因此,构造函数必须调用 super()
。
UDParser 方法
您的 UDParser
子类必须覆盖 process()
或 processWithMetadata()
:
注意
processWithMetadata()
仅适用于以 C++ 编程语言编写的用户定义扩展 (UDx)。
-
process()
将原始输入流作为一个大文件读取。如果有任何错误或故障,整个加载将失败。您可以实施具有源或筛选器(可实施processWithMetadata()
)的process()
,但这可能会导致解析错误。
您可以在上游源或筛选器实施processWithMetadata()
时实施process()
,但可能会导致解析错误。 -
processWithMetadata()
当数据源具有以某种结构化格式(与数据有效负载分开)提供的关于记录边界的元数据时很有用。使用此接口,源除了发出数据之外,还会为每个记录发出记录长度。通过在每个阶段实施
processWithMetadata()
而不是process()
,您可以在整个加载堆栈中保留此记录长度元数据,从而实施更有效的解析,以在每个消息的基础上而不是每个文件或每个源的基础上从错误中恢复。 KafkaSource 和 Kafka 解析器(KafkaAvroParser、KafkaJSONParser 和 KafkaParser)会在各条 Kafka 消息损坏时使用这种机制为拒绝每条 Kafka 消息提供支持。注意
要实施processWithMetadata()
,您必须覆盖useSideChannel()
以返回true
。
此外,您必须覆盖 getRejectedRecord()
以返回有关被拒绝记录的信息。
或者,您可以覆盖其他 UDParser
类方法。
解析器执行
以下各节详细说明了每次调用用户定义的解析器时的执行序列。以下示例覆盖 process()
方法。
设置
COPY 在第一次调用 process()
之前调用了 setup()
。使用 setup()
可执行任何初始设置任务,以使解析器能够解析数据。此设置包括从类上下文结构检索参数,或初始化数据结构以在过滤期间使用。在第一次调用 process()
方法之前,Vertica 会先调用此方法。您的对象可能在使用期间已损坏并重新创建,因此请确保您的对象可以重新启动。
解析
COPY 将在查询执行期间重复调用 process()
。Vertica 会向此方法传递数据缓冲区以解析为列和行以及由 InputState
定义的以下输入状态之一:
-
OK
:当前在流的开头或中间 -
END_OF_FILE
:无更多数据可用。 -
END_OF_CHUNK
:当前数据在记录边界处结束且解析器应在返回之前用完所有数据。此输入状态仅在使用块分割器时出现。 -
START_OF_PORTION
:输入的起始位置不是源的开头。解析器应查找第一个记录结束标记。此输入状态仅在使用分摊加载时出现。您可以使用getPortion()
方法访问相应部分的偏移量和大小。 -
END_OF_PORTION
:源已到达其部分的结尾。解析器应完成处理它开始的最后一条记录且不再前进。此输入状态仅在使用分摊加载时出现。
解析器必须拒绝其无法解析的任何数据,以便 Vertica 可以报告拒绝并将拒绝的数据写入到文件中。
process()
方法必须从输入缓冲区解析尽可能多的数据。输入缓冲区可能不在行边界结束。因此,该方法可能不得不在输入行的中间停止解析并请求更多数据。如果源文件包含 null 字节,则输入可以包含 null 字节,而且不会自动以 null 终止。
解析器具有关联的 StreamWriter
对象,数据写入实际上由此对象执行。当解析器提取列值后,它会对 StreamWriter
使用特定于类型的方法之一以将该值写入到输出流。有关这些方法的详细信息,请参阅写入数据。
对 process()
的一次调用可能会写入多行数据。当解析器结束对数据行的处理后,它必须对 StreamWriter
调用 next()
以使输出流前进到新行。(通常,解析器会完成处理一行,因为它遇到了行尾标记。)
当 process()
方法到达缓冲区结尾时,它会通过返回由 StreamState
定义的以下值之一来告知 Vertica 其当前状态:
-
INPUT_NEEDED
:解析器已到达缓冲区的结尾且需要获取更多可供解析的数据。 -
DONE
:解析器已到达输入数据流的结尾。 -
REJECT
:解析器已拒绝所读取的最后一行数据(请参阅拒绝行)。
分解
COPY 将在最后一次调用 process()
之后调用 destroy()
。此方法可释放由 setup()
或 process()
方法预留的任何资源。
在 process()
方法指示它已完成对数据源的解析之后,Vertica 将调用此方法。但是,有时可能会剩余尚未处理的数据源。在这种情况下,Vertica 可能会在稍后再次对该对象调用 setup()
,并让此方法解析新的数据流中的数据。因此,请编写 destroy()
方法,以使 UDParser
子类的实例处于某种状态,从而可以安全地再次调用 setup()
。
报告拒绝
如果 process()
拒绝某行,Vertica 将调用 getRejectedRecord()
以报告该拒绝。通常,此方法将返回 RejectedRecord
类的实例和拒绝的行的详细信息。
写入数据
解析器具有关联的 StreamWriter
对象,您可以通过调用 getStreamWriter()
来访问此对象。在实施 process()
时,对 StreamWriter
对象使用
setType()
方法可将行中的值写入到特定的列索引。请验证写入的数据类型是否与架构所需的数据类型匹配。
以下示例显示了如何将类型为 long
的值写入到当前行中的第四列(索引 3):
StreamWriter writer = getStreamWriter();
...
writer.setLongValue(3, 98.6);
StreamWriter
为所有基本类型提供多种方法,例如 setBooleanValue()
、setStringValue()
等。有关 StreamWriter
方法的完整列表,包括使用基元类型的选项或将条目显式设置为 null 的选项,请参阅 API 文档。
拒绝行
如果解析器发现无法解析的数据,它应按照以下过程拒绝该行:
-
保存有关拒绝的行数据的详细信息和拒绝原因。这些信息片段可以直接存储在
RejectedRecord
对象中,或者也可以在需要使用之前将其存储在UDParser
子类上的字段中。 -
通过更新
input.offset
来更新该行在输入缓冲区中的位置,以便可以从下一行继续解析。 -
通过返回值
StreamState.REJECT
来指示它已拒绝某个行。 -
返回
RejectedRecord
类的实例和有关拒绝的行的详细信息。
拆分大型负载
Vertica 提供了两种拆分大型负载的方法。 分摊加载 允许您在多个数据库节点之间分配负载。 协作解析 (仅限 C++)允许您在一个节点上的多个线程之间分配负载。
API
UDParser API 提供了以下通过子类扩展的方法:
virtual void setup(ServerInterface &srvInterface, SizedColumnTypes &returnType);
virtual bool useSideChannel();
virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input, InputState input_state)=0;
virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &input,
LengthBuffer &input_lengths, InputState input_state)=0;
virtual void cancel(ServerInterface &srvInterface);
virtual void destroy(ServerInterface &srvInterface, SizedColumnTypes &returnType);
virtual RejectedRecord getRejectedRecord();
UDParser API 提供了以下通过子类扩展的方法:
public void setup(ServerInterface srvInterface, SizedColumnTypes returnType)
throws UdfException;
public abstract StreamState process(ServerInterface srvInterface,
DataBuffer input, InputState input_state)
throws UdfException, DestroyInvocation;
protected void cancel(ServerInterface srvInterface);
public void destroy(ServerInterface srvInterface, SizedColumnTypes returnType)
throws UdfException;
public RejectedRecord getRejectedRecord() throws UdfException;
UDParser
使用 StreamWriter
写入其输出。 StreamWriter
为所有基本类型提供多种方法,例如 setBooleanValue()
、setStringValue()
等。在 Java API 中,该类还提供可自动设置数据类型的 setValue()
方法。
前述方法可写入单个列值。 StreamWriter
还提供用于从映射写入完整行的方法。setRowFromMap()
方法使用列名称和值的映射,并将所有值写入到相应的列。此方法不定义新列,而是仅将值写入到现有列。JsonParser
示例使用此方法写入任意 JSON 输入。(请参阅Java 示例:JSON 解析器。)
注意
setRowFromMap()
方法不会自动使输入前进到下一行;您必须调用 next()
。这样,您可以读取某个行,然后覆盖所选列值。
setRowsFromMap()
还会使用提供的完整映射填充 Flex 表(请参阅 Flex 表)的任何 VMap ('raw') 列。在大多数情况下,setRowsFromMap()
是用于填充弹性表的适当方法。但是,您也可以使用 setVMap()
(类似于其他 setValue()
方法)在指定的列中生成 VMap 值。
setRowFromMap()
方法可自动强制将输入值转换为使用关联的 TypeCoercion
为这些列定义的类型。在大多数情况下,使用默认实施 (StandardTypeCoercion
) 都是合适的。
TypeCoercion
使用策略来控制其行为。例如,FAIL_INVALID_INPUT_VALUE
策略指示将无效输入视为错误,而非使用 null 值。系统会捕获错误,并将其作为拒绝进行处理(请参阅用户定义的解析器中的“拒绝行”)。策略还能控制是否截断太长的输入。对解析器的 TypeCoercion
使用 setPolicy()
方法可设置策略。有关支持的值,请参阅 API 文档。
除了设置这些策略之外,您可能还需要自定义类型强制转换。要执行此操作,请子类化提供的 TypeCoercion
实施之一,并覆盖
asType()
方法。如果解析器将读取来自第三方库的对象,则有必要进行此自定义。例如,用于处理地理坐标的解析器可以覆盖 asLong
,以将诸如“40.4397N”等输入转换为数字。有关实施的列表,请参阅 Vertica API 文档。
UDParser API 提供了以下通过子类扩展的方法:
class PyUDParser(vertica_sdk.UDSource):
def __init__(self):
pass
def setup(srvInterface, returnType):
pass
def process(self, srvInterface, inputbuffer, inputstate, streamwriter):
# User implement process function.
# User reads data from inputbuffer and parse the data.
# Rows are emitted via the streamwriter argument
return StreamState.DONE
在 Python 中,process()
方法需要输入缓冲区和输出缓冲区(请参阅 InputBuffer API 和 OutputBuffer API)。输入缓冲区表示您要解析的信息源。输出缓冲区会将筛选后的信息传递给 Vertica。
如果筛选器拒绝记录,请使用方法 REJECT()
来识别被拒绝的数据和拒绝原因。