UDFilter 类

UDFilter 类负责从源读取原始输入数据,以及使数据准备好加载到 Vertica 或由解析器处理。此准备可能涉及解压缩、重新编码或任何其他类型的二进制处理。

UDFilter 由 Vertica 群集中对数据源执行筛选的每个主机上相应的 FilterFactory 实例化。

UDFilter 方法

您的 UDFilter 子类必须覆盖 process()processWithMetadata()

  • process() 将原始输入流作为一个大文件读取。如果有任何错误或故障,整个加载将失败。
    上游源实施 processWithMetadata() 时可以实施 process(),但可能会导致解析错误。

  • processWithMetadata() 当数据源具有以某种结构化格式(与数据有效负载分开)提供的关于记录边界的元数据时很有用。使用此接口,源除了发出数据之外,还会为每个记录发出记录长度。

    通过在每个阶段实施 processWithMetadata() 而不是 process(),您可以在整个加载堆栈中保留此记录长度元数据,从而实施更有效的解析,以在每个消息的基础上而不是每个文件或每个源的基础上从错误中恢复。 KafkaSource 和 Kafka 解析器(KafkaAvroParserKafkaJSONParserKafkaParser)在单个 Kafka 消息损坏时使用这种机制来支持每个 Kafka 消息的拒绝。

    processWithMetadata()UDFilter 子类一起使用,您可以编写一个内部筛选器,该筛选器将源中的记录长度元数据集成到数据流中,生成带边界信息的单字节流以帮助解析器提取和处理单个消息。KafkaInsertDelimetersKafkaInsertLengths 使用这种机制将消息边界信息插入到 Kafka 数据流中。

或者,您可以覆盖其他 UDFilter 类方法。

筛选器执行

以下部分详细说明了每次调用用户定义的筛选器时的执行序列。以下示例覆盖 process() 方法。

正在设置
COPY 在第一次调用 process() 之前调用了 setup()。使用 setup() 可执行任何必要的设置步骤以使过滤器正常工作,例如,初始化数据结构以在过滤期间使用。您的对象可能在使用期间已损坏并重新创建,因此请确保您的对象可以重新启动。

过滤数据
COPY 将在查询执行期间重复调用 process() 以过滤数据。此方法将在其参数中收到 DataBuffer 类的两个实例以及一个输入缓冲区和一个输出缓冲区。您的实施应从输入缓冲区读取数据,并以某种方式(例如解压缩)处理数据,然后将结果写入到输出缓冲区。您的实施所读取的字节数和所写入的字节数之间可能不存在一对一关联。process() 方法应处理数据,直至没有更多数据可读取或输出缓冲区中的空间已用完。当出现这两种情况之一时,该方法应返回以下由 StreamState 定义的值之一:

  • OUTPUT_NEEDED,如果过滤器的输出缓冲区需要更多空间。

  • INPUT_NEEDED,如果过滤器已用完输入数据(但尚未完全处理数据源)。

  • DONE,如果过滤器已处理数据源中的所有数据。

  • KEEP_GOING,如果过滤器在很长一段时间内无法继续执行操作。系统会再次调用该方法,因此请勿无限期阻止该方法,否则会导致用户无法取消查询。

返回之前,process() 方法必须在每个 DataBuffer 中设置 offset 属性。在输入缓冲区中,请将该属性设置为方法成功读取的字节数。在输出缓冲区中,请将该属性设置为方法已写入的字节数。通过设置这些属性,对 process() 发出的下一次调用可以在缓冲区中的正确位置继续读取和写入数据。

process() 方法还需要检查传递给它的 InputState 对象,以确定数据源中是否存在更多数据。如果此对象等于 END_OF_FILE,则输入数据中剩余的数据是数据源中的最后数据。处理完所有剩余的数据后,process() 必须返回 DONE。

分解
COPY 将在最后一次调用 process() 之后调用 destroy()。此方法可释放由 setup()process() 方法预留的任何资源。Vertica 将在 process() 方法指示已完成对数据流中所有数据的筛选之后调用此方法。

如果仍有数据源尚未处理,Vertica 可能会在稍后再次对该对象调用 setup()。在后续发出调用时,Vertica 会指示该方法筛选新的数据流中的数据。因此,destroy() 方法应使 UDFilter 子类的对象处于某种状态,以便 setup() 方法可为重复使用该对象做好准备。

API

UDFilter API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input, InputState input_state, DataBuffer &output)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &input,
    LengthBuffer &input_lengths, InputState input_state, DataBuffer &output, LengthBuffer &output_lengths)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface);

UDFilter API 提供了以下通过子类扩展的方法:

public void setup(ServerInterface srvInterface) throws UdfException;

public abstract StreamState process(ServerInterface srvInterface, DataBuffer input,
                InputState input_state, DataBuffer output)
    throws UdfException;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface) throws UdfException;

UDFilter API 提供了以下通过子类扩展的方法:

class PyUDFilter(vertica_sdk.UDFilter):
    def __init__(self):
        pass

    def setup(self, srvInterface):
        pass

    def process(self, srvInterface, inputbuffer, outputbuffer, inputstate):
        # User process data here, and put into outputbuffer.
        return StreamState.DONE