这是本节的多页打印视图。
点击此处打印.
返回本页常规视图.
用户自定义的筛选器
使用用户定义的筛选器,您可以通过多种方式处理从源获取的数据。例如,筛选器可以执行下列操作:
-
处理其压缩格式不受 Vertica 原生支持的压缩文件。
-
接收 UTF-16 编码数据并将其转码为 UTF-8 编码。
-
对数据执行搜索和替换操作,然后再将数据加载到 Vertica 中。
您还可以通过多个筛选器处理数据,然后再将数据加载到 Vertica 中。例如,您可以解压缩使用 GZip 格式压缩的文件,然后将内容从 UTF-16 转换为 UTF-8,最后搜索并替换某些文本字符串。
如果您实施 UDFilter
,还必须实施相应的 FilterFactory
。
有关 API 详细信息,请参阅 UDFilter 类 和 FilterFactory 类。
1 - UDFilter 类
UDFilter
类负责从源读取原始输入数据,以及使数据准备好加载到 Vertica 或由解析器处理。此准备可能涉及解压缩、重新编码或任何其他类型的二进制处理。
UDFilter
由 Vertica 群集中对数据源执行筛选的每个主机上相应的 FilterFactory
实例化。
UDFilter 方法
您的 UDFilter
子类必须覆盖 process()
或 processWithMetadata()
:
注意
processWithMetadata()
仅适用于以 C++ 编程语言编写的用户定义扩展 (UDx)。
-
process()
将原始输入流作为一个大文件读取。如果有任何错误或故障,整个加载将失败。
上游源实施 processWithMetadata()
时可以实施 process()
,但可能会导致解析错误。
-
processWithMetadata()
当数据源具有以某种结构化格式(与数据有效负载分开)提供的关于记录边界的元数据时很有用。使用此接口,源除了发出数据之外,还会为每个记录发出记录长度。
通过在每个阶段实施 processWithMetadata()
而不是 process()
,您可以在整个加载堆栈中保留此记录长度元数据,从而实施更有效的解析,以在每个消息的基础上而不是每个文件或每个源的基础上从错误中恢复。 KafkaSource 和 Kafka 解析器(KafkaAvroParser、KafkaJSONParser 和 KafkaParser)在单个 Kafka 消息损坏时使用这种机制来支持每个 Kafka 消息的拒绝。
将 processWithMetadata()
与 UDFilter
子类一起使用,您可以编写一个内部筛选器,该筛选器将源中的记录长度元数据集成到数据流中,生成带边界信息的单字节流以帮助解析器提取和处理单个消息。KafkaInsertDelimeters 和 KafkaInsertLengths 使用这种机制将消息边界信息插入到 Kafka 数据流中。
注意
要实施 processWithMetadata()
,您必须覆盖 useSideChannel()
以返回 true
。
或者,您可以覆盖其他 UDFilter
类方法。
筛选器执行
以下部分详细说明了每次调用用户定义的筛选器时的执行序列。以下示例覆盖 process()
方法。
正在设置
COPY 在第一次调用 process()
之前调用了 setup()
。使用 setup()
可执行任何必要的设置步骤以使过滤器正常工作,例如,初始化数据结构以在过滤期间使用。您的对象可能在使用期间已损坏并重新创建,因此请确保您的对象可以重新启动。
过滤数据
COPY 将在查询执行期间重复调用 process()
以过滤数据。此方法将在其参数中收到 DataBuffer
类的两个实例以及一个输入缓冲区和一个输出缓冲区。您的实施应从输入缓冲区读取数据,并以某种方式(例如解压缩)处理数据,然后将结果写入到输出缓冲区。您的实施所读取的字节数和所写入的字节数之间可能不存在一对一关联。process()
方法应处理数据,直至没有更多数据可读取或输出缓冲区中的空间已用完。当出现这两种情况之一时,该方法应返回以下由 StreamState
定义的值之一:
-
OUTPUT_NEEDED,如果过滤器的输出缓冲区需要更多空间。
-
INPUT_NEEDED,如果过滤器已用完输入数据(但尚未完全处理数据源)。
-
DONE,如果过滤器已处理数据源中的所有数据。
-
KEEP_GOING,如果过滤器在很长一段时间内无法继续执行操作。系统会再次调用该方法,因此请勿无限期阻止该方法,否则会导致用户无法取消查询。
返回之前,process()
方法必须在每个 DataBuffer
中设置 offset
属性。在输入缓冲区中,请将该属性设置为方法成功读取的字节数。在输出缓冲区中,请将该属性设置为方法已写入的字节数。通过设置这些属性,对 process()
发出的下一次调用可以在缓冲区中的正确位置继续读取和写入数据。
process()
方法还需要检查传递给它的 InputState
对象,以确定数据源中是否存在更多数据。如果此对象等于 END_OF_FILE,则输入数据中剩余的数据是数据源中的最后数据。处理完所有剩余的数据后,process()
必须返回 DONE。
分解
COPY 将在最后一次调用 process()
之后调用 destroy()
。此方法可释放由 setup()
或 process()
方法预留的任何资源。Vertica 将在 process()
方法指示已完成对数据流中所有数据的筛选之后调用此方法。
如果仍有数据源尚未处理,Vertica 可能会在稍后再次对该对象调用 setup()
。在后续发出调用时,Vertica 会指示该方法筛选新的数据流中的数据。因此,destroy()
方法应使 UDFilter
子类的对象处于某种状态,以便 setup()
方法可为重复使用该对象做好准备。
API
UDFilter API 提供了以下通过子类扩展的方法:
UDFilter API 提供了以下通过子类扩展的方法:
UDFilter API 提供了以下通过子类扩展的方法:
2 - FilterFactory 类
如果编写过滤器,您还必须编写用于生成过滤器实例的过滤器工厂。为此,请对 FilterFactory
类设置子类。
子类将执行函数执行的初始验证和规划工作,并将每个主机(将进行数据过滤)上的 UDFilter
对象实例化。
过滤器工厂是单例。您的子类必须为无状态子类,没有包含数据的字段。该子类还不得修改任何全局变量。
FilterFactory 方法
FilterFactory
类定义了以下方法。您的子类必须覆盖 prepare()
方法。可以覆盖其他方法。
设置
Vertica 将在启动程序节点上调用 plan()
一次,以执行下列任务:
-
检查已从 COPY 语句中的函数调用传递的任何参数和错误消息(如果出现任何问题)。您通过从传递至 plan()
方法的 ServerInterface
的实例中获得 ParamReader
对象读取参数。
-
存储各个主机所需的任何信息,以便过滤作为参数传递的 PlanContext
实例中的数据。例如,您可以存储过滤器将读取的输入格式的详细信息,并输出该过滤器应生成的格式。plan()
方法仅在启动程序节点上运行,而 prepare()
方法则在每个读取数据源的主机上运行。因此,该对象是它们之间的唯一通信方式。
您通过从 getWriter()
方法中获取 ParamWriter
对象在 PlanContext
中存储数据。然后,通过对 ParamWriter
调用方法(比如 setString
)写入参数。
注意
ParamWriter
仅提供存储简单数据类型的能力。对于复杂的类型,您需要以某种方式对数据进行序列化并将其作为字符串或长字符串存储。
创建筛选器
Vertica 将调用 prepare()
,以创建并初始化筛选器。它在将执行过滤的每个节点上调用此方法一次。Vertica 根据可用资源自动选择可完成该工作的最佳节点。您无法指定在哪些节点上完成工作。
定义参数
实施 getParameterTypes()
可定义过滤器所使用的参数的名称和类型。Vertica 使用此信息对调用人员发出有关未知或缺失参数的警告。Vertica 会忽略未知参数并为缺失参数使用默认值。当您需要为函数定义类型和参数时,您不需要覆盖此方法。
API
FilterFactory API 提供了以下通过子类扩展的方法:
创建 FilterFactory
之后,您必须将其注册到 RegisterFactory
宏。
FilterFactory API 提供了以下通过子类扩展的方法:
FilterFactory API 提供了以下通过子类扩展的方法:
3 - Java 示例: ReplaceCharFilter
本节中的示例演示了创建一个 UDFilter
,以用于在输出流中将输入流中某个字符的任何实例替换为另一个字符。此示例是高度简化的示例,并假设输入流为 ASCII 数据。
始终应记住的是,UDFilter
中的输入流和输出流实际上是二进制数据。如果要使用 UDFilter
执行字符转换,请将数据流从字节字符串转换为正确编码的字符串。例如,输入流可能包含 UTF-8 编码文本。如果是的话,务必先将要从缓冲区中读取的原始二进制数据转换为 UTF 字符串,然后再进行处理。
加载和使用示例
示例 UDFilter
具有两个必需参数。from_char
参数指定了要替换的字符,而 to_char
参数指定了替换字符。按如下所示加载并使用 ReplaceCharFilter
UDFilter:
=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaUDlLib.jar'
->LANGUAGE 'JAVA';
CREATE LIBRARY
=> CREATE FILTER ReplaceCharFilter as LANGUAGE 'JAVA'
->name 'com.mycompany.UDL.ReplaceCharFilterFactory' library JavaLib;
CREATE FILTER FUNCTION
=> CREATE TABLE t (text VARCHAR);
CREATE TABLE
=> COPY t FROM STDIN WITH FILTER ReplaceCharFilter(from_char='a', to_char='z');
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> Mary had a little lamb
>> a man, a plan, a canal, Panama
>> \.
=> SELECT * FROM t;
text
--------------------------------
Mzry hzd z little lzmb
z mzn, z plzn, z cznzl, Pznzmz
(2 rows)
=> --Calling the filter with incorrect parameters returns errors
=> COPY t from stdin with filter ReplaceCharFilter();
ERROR 3399: Failure in UDx RPC call InvokePlanUDL(): Error in User Defined Object [
ReplaceCharFilter], error code: 0
com.vertica.sdk.UdfException: You must supply two parameters to ReplaceChar: 'from_char' and 'to_char'
at com.vertica.JavaLibs.ReplaceCharFilterFactory.plan(ReplaceCharFilterFactory.java:22)
at com.vertica.udxfence.UDxExecContext.planUDFilter(UDxExecContext.java:889)
at com.vertica.udxfence.UDxExecContext.planCurrentUDLType(UDxExecContext.java:865)
at com.vertica.udxfence.UDxExecContext.planUDL(UDxExecContext.java:821)
at com.vertica.udxfence.UDxExecContext.run(UDxExecContext.java:242)
at java.lang.Thread.run(Thread.java:662)
解析器实施
ReplaceCharFilter
类将读取数据流,并将用户指定字符的每个实例替换为另一个字符。
工厂实施
ReplaceCharFilterFactory
需要两个参数(from_char
和 to_char
)。plan()
方法可验证这些参数是否存在以及是否为单字符字符串。然后,此方法会将它们存储在计划上下文中。prepare()
方法可获取参数值并将参数值传递到 ReplaceCharFilter
对象,然后将这些对话实例化以执行过滤。
4 - C++ 示例:转换编码
以下示例显示了如何通过将 UTF-16 编码数据转换为 UTF-8 来将文件的编码从一种类型转换为另一种类型。您可以在 SDK 的 /opt/vertica/sdk/examples/FilterFunctions/IConverter.cpp
目录中找到此示例。
筛选器实施
工厂实施