这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

用户自定义的筛选器

使用用户定义的筛选器,您可以通过多种方式处理从源获取的数据。例如,筛选器可以执行下列操作:

  • 处理其压缩格式不受 Vertica 原生支持的压缩文件。

  • 接收 UTF-16 编码数据并将其转码为 UTF-8 编码。

  • 对数据执行搜索和替换操作,然后再将数据加载到 Vertica 中。

您还可以通过多个筛选器处理数据,然后再将数据加载到 Vertica 中。例如,您可以解压缩使用 GZip 格式压缩的文件,然后将内容从 UTF-16 转换为 UTF-8,最后搜索并替换某些文本字符串。

如果您实施 UDFilter,还必须实施相应的 FilterFactory

有关 API 详细信息,请参阅 UDFilter 类FilterFactory 类

1 - UDFilter 类

UDFilter 类负责从源读取原始输入数据,以及使数据准备好加载到 Vertica 或由解析器处理。此准备可能涉及解压缩、重新编码或任何其他类型的二进制处理。

UDFilter 由 Vertica 群集中对数据源执行筛选的每个主机上相应的 FilterFactory 实例化。

UDFilter 方法

您的 UDFilter 子类必须覆盖 process()processWithMetadata()

  • process() 将原始输入流作为一个大文件读取。如果有任何错误或故障,整个加载将失败。
    上游源实施 processWithMetadata() 时可以实施 process(),但可能会导致解析错误。

  • processWithMetadata() 当数据源具有以某种结构化格式(与数据有效负载分开)提供的关于记录边界的元数据时很有用。使用此接口,源除了发出数据之外,还会为每个记录发出记录长度。

    通过在每个阶段实施 processWithMetadata() 而不是 process(),您可以在整个加载堆栈中保留此记录长度元数据,从而实施更有效的解析,以在每个消息的基础上而不是每个文件或每个源的基础上从错误中恢复。 KafkaSource 和 Kafka 解析器(KafkaAvroParserKafkaJSONParserKafkaParser)在单个 Kafka 消息损坏时使用这种机制来支持每个 Kafka 消息的拒绝。

    processWithMetadata()UDFilter 子类一起使用,您可以编写一个内部筛选器,该筛选器将源中的记录长度元数据集成到数据流中,生成带边界信息的单字节流以帮助解析器提取和处理单个消息。KafkaInsertDelimetersKafkaInsertLengths 使用这种机制将消息边界信息插入到 Kafka 数据流中。

或者,您可以覆盖其他 UDFilter 类方法。

筛选器执行

以下部分详细说明了每次调用用户定义的筛选器时的执行序列。以下示例覆盖 process() 方法。

正在设置
COPY 在第一次调用 process() 之前调用了 setup()。使用 setup() 可执行任何必要的设置步骤以使过滤器正常工作,例如,初始化数据结构以在过滤期间使用。您的对象可能在使用期间已损坏并重新创建,因此请确保您的对象可以重新启动。

过滤数据
COPY 将在查询执行期间重复调用 process() 以过滤数据。此方法将在其参数中收到 DataBuffer 类的两个实例以及一个输入缓冲区和一个输出缓冲区。您的实施应从输入缓冲区读取数据,并以某种方式(例如解压缩)处理数据,然后将结果写入到输出缓冲区。您的实施所读取的字节数和所写入的字节数之间可能不存在一对一关联。process() 方法应处理数据,直至没有更多数据可读取或输出缓冲区中的空间已用完。当出现这两种情况之一时,该方法应返回以下由 StreamState 定义的值之一:

  • OUTPUT_NEEDED,如果过滤器的输出缓冲区需要更多空间。

  • INPUT_NEEDED,如果过滤器已用完输入数据(但尚未完全处理数据源)。

  • DONE,如果过滤器已处理数据源中的所有数据。

  • KEEP_GOING,如果过滤器在很长一段时间内无法继续执行操作。系统会再次调用该方法,因此请勿无限期阻止该方法,否则会导致用户无法取消查询。

返回之前,process() 方法必须在每个 DataBuffer 中设置 offset 属性。在输入缓冲区中,请将该属性设置为方法成功读取的字节数。在输出缓冲区中,请将该属性设置为方法已写入的字节数。通过设置这些属性,对 process() 发出的下一次调用可以在缓冲区中的正确位置继续读取和写入数据。

process() 方法还需要检查传递给它的 InputState 对象,以确定数据源中是否存在更多数据。如果此对象等于 END_OF_FILE,则输入数据中剩余的数据是数据源中的最后数据。处理完所有剩余的数据后,process() 必须返回 DONE。

分解
COPY 将在最后一次调用 process() 之后调用 destroy()。此方法可释放由 setup()process() 方法预留的任何资源。Vertica 将在 process() 方法指示已完成对数据流中所有数据的筛选之后调用此方法。

如果仍有数据源尚未处理,Vertica 可能会在稍后再次对该对象调用 setup()。在后续发出调用时,Vertica 会指示该方法筛选新的数据流中的数据。因此,destroy() 方法应使 UDFilter 子类的对象处于某种状态,以便 setup() 方法可为重复使用该对象做好准备。

API

UDFilter API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input, InputState input_state, DataBuffer &output)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &input,
    LengthBuffer &input_lengths, InputState input_state, DataBuffer &output, LengthBuffer &output_lengths)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface);

UDFilter API 提供了以下通过子类扩展的方法:

public void setup(ServerInterface srvInterface) throws UdfException;

public abstract StreamState process(ServerInterface srvInterface, DataBuffer input,
                InputState input_state, DataBuffer output)
    throws UdfException;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface) throws UdfException;

UDFilter API 提供了以下通过子类扩展的方法:

class PyUDFilter(vertica_sdk.UDFilter):
    def __init__(self):
        pass

    def setup(self, srvInterface):
        pass

    def process(self, srvInterface, inputbuffer, outputbuffer, inputstate):
        # User process data here, and put into outputbuffer.
        return StreamState.DONE

2 - FilterFactory 类

如果编写过滤器,您还必须编写用于生成过滤器实例的过滤器工厂。为此,请对 FilterFactory 类设置子类。

子类将执行函数执行的初始验证和规划工作,并将每个主机(将进行数据过滤)上的 UDFilter 对象实例化。

过滤器工厂是单例。您的子类必须为无状态子类,没有包含数据的字段。该子类还不得修改任何全局变量。

FilterFactory 方法

FilterFactory 类定义了以下方法。您的子类必须覆盖 prepare() 方法。可以覆盖其他方法。

设置

Vertica 将在启动程序节点上调用 plan() 一次,以执行下列任务:

  • 检查已从 COPY 语句中的函数调用传递的任何参数和错误消息(如果出现任何问题)。您通过从传递至 plan() 方法的 ServerInterface 的实例中获得 ParamReader 对象读取参数。

  • 存储各个主机所需的任何信息,以便过滤作为参数传递的 PlanContext 实例中的数据。例如,您可以存储过滤器将读取的输入格式的详细信息,并输出该过滤器应生成的格式。plan() 方法仅在启动程序节点上运行,而 prepare() 方法则在每个读取数据源的主机上运行。因此,该对象是它们之间的唯一通信方式。

    您通过从 getWriter() 方法中获取 ParamWriter 对象在 PlanContext 中存储数据。然后,通过对 ParamWriter 调用方法(比如 setString)写入参数。

创建筛选器

Vertica 将调用 prepare(),以创建并初始化筛选器。它在将执行过滤的每个节点上调用此方法一次。Vertica 根据可用资源自动选择可完成该工作的最佳节点。您无法指定在哪些节点上完成工作。

定义参数

实施 getParameterTypes() 可定义过滤器所使用的参数的名称和类型。Vertica 使用此信息对调用人员发出有关未知或缺失参数的警告。Vertica 会忽略未知参数并为缺失参数使用默认值。当您需要为函数定义类型和参数时,您不需要覆盖此方法。

API

FilterFactory API 提供了以下通过子类扩展的方法:

virtual void plan(ServerInterface &srvInterface, PlanContext &planCtxt);

virtual UDFilter * prepare(ServerInterface &srvInterface, PlanContext &planCtxt)=0;

virtual void getParameterType(ServerInterface &srvInterface, SizedColumnTypes &parameterTypes);

创建 FilterFactory 之后,您必须将其注册到 RegisterFactory 宏。

FilterFactory API 提供了以下通过子类扩展的方法:

public void plan(ServerInterface srvInterface, PlanContext planCtxt)
    throws UdfException;

public abstract UDFilter prepare(ServerInterface srvInterface, PlanContext planCtxt)
    throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

FilterFactory API 提供了以下通过子类扩展的方法:


class PyFilterFactory(vertica_sdk.SourceFactory):
    def __init__(self):
        pass
    def plan(self):
        pass
    def prepare(self, planContext):
        #User implement the function to create PyUDSources.
        pass

3 - Java 示例: ReplaceCharFilter

本节中的示例演示了创建一个 UDFilter,以用于在输出流中将输入流中某个字符的任何实例替换为另一个字符。此示例是高度简化的示例,并假设输入流为 ASCII 数据。

始终应记住的是,UDFilter 中的输入流和输出流实际上是二进制数据。如果要使用 UDFilter 执行字符转换,请将数据流从字节字符串转换为正确编码的字符串。例如,输入流可能包含 UTF-8 编码文本。如果是的话,务必先将要从缓冲区中读取的原始二进制数据转换为 UTF 字符串,然后再进行处理。

加载和使用示例

示例 UDFilter 具有两个必需参数。from_char 参数指定了要替换的字符,而 to_char 参数指定了替换字符。按如下所示加载并使用 ReplaceCharFilter UDFilter:

=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaUDlLib.jar'
->LANGUAGE 'JAVA';
CREATE LIBRARY
=> CREATE FILTER ReplaceCharFilter as LANGUAGE 'JAVA'
->name 'com.mycompany.UDL.ReplaceCharFilterFactory' library JavaLib;
CREATE FILTER FUNCTION
=> CREATE TABLE t (text VARCHAR);
CREATE TABLE
=> COPY t FROM STDIN WITH FILTER ReplaceCharFilter(from_char='a', to_char='z');
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> Mary had a little lamb
>> a man, a plan, a canal, Panama
>> \.

=> SELECT * FROM t;
              text
--------------------------------
 Mzry hzd z little lzmb
 z mzn, z plzn, z cznzl, Pznzmz
(2 rows)

=> --Calling the filter with incorrect parameters returns errors
=> COPY t from stdin with filter ReplaceCharFilter();
ERROR 3399:  Failure in UDx RPC call InvokePlanUDL(): Error in User Defined Object [
ReplaceCharFilter], error code: 0
com.vertica.sdk.UdfException: You must supply two parameters to ReplaceChar: 'from_char' and 'to_char'
        at com.vertica.JavaLibs.ReplaceCharFilterFactory.plan(ReplaceCharFilterFactory.java:22)
        at com.vertica.udxfence.UDxExecContext.planUDFilter(UDxExecContext.java:889)
        at com.vertica.udxfence.UDxExecContext.planCurrentUDLType(UDxExecContext.java:865)
        at com.vertica.udxfence.UDxExecContext.planUDL(UDxExecContext.java:821)
        at com.vertica.udxfence.UDxExecContext.run(UDxExecContext.java:242)
        at java.lang.Thread.run(Thread.java:662)

解析器实施

ReplaceCharFilter 类将读取数据流,并将用户指定字符的每个实例替换为另一个字符。

package com.vertica.JavaLibs;

import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.InputState;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.UDFilter;

public class ReplaceCharFilter extends UDFilter {

    private byte[] fromChar;
    private byte[] toChar;

    public ReplaceCharFilter(String fromChar, String toChar){
        // Stores the from char and to char as byte arrays. This is
        // not a robust method of doing this, but works for this simple
        // example.
        this.fromChar= fromChar.getBytes();
        this.toChar=toChar.getBytes();
    }
    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer input,
            InputState input_state, DataBuffer output) {

        // Check if there is no more input and the input buffer has been completely
        // processed. If so, filtering is done.
        if (input_state == InputState.END_OF_FILE && input.buf.length == 0) {
            return StreamState.DONE;
        }

        // Get current position in the input buffer
        int offset = output.offset;

        // Determine how many bytes to process. This is either until input
        // buffer is exhausted or output buffer is filled
        int limit = Math.min((input.buf.length - input.offset),
                (output.buf.length - output.offset));

        for (int i = input.offset; i < limit; i++) {
            // This example just replaces each instance of from_char
            // with to_char. It does not consider things such as multi-byte
            // UTF-8 characters.
            if (input.buf[i] == fromChar[0]) {
                output.buf[i+offset] = toChar[0];
            } else {
                // Did not find from_char, so copy input to the output
                output.buf[i+offset]=input.buf[i];
            }
        }

        input.offset += limit;
        output.offset += input.offset;

        if (input.buf.length - input.offset < output.buf.length - output.offset) {
            return StreamState.INPUT_NEEDED;
        } else {
            return StreamState.OUTPUT_NEEDED;
        }
    }
}

工厂实施

ReplaceCharFilterFactory 需要两个参数(from_charto_char)。plan() 方法可验证这些参数是否存在以及是否为单字符字符串。然后,此方法会将它们存储在计划上下文中。prepare() 方法可获取参数值并将参数值传递到 ReplaceCharFilter 对象,然后将这些对话实例化以执行过滤。

package com.vertica.JavaLibs;

import java.util.ArrayList;
import java.util.Vector;

import com.vertica.sdk.FilterFactory;
import com.vertica.sdk.PlanContext;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.UDFilter;
import com.vertica.sdk.UdfException;

public class ReplaceCharFilterFactory extends FilterFactory {

    // Run on the initiator node to perform varification and basic setup.
    @Override
    public void plan(ServerInterface srvInterface,PlanContext planCtxt)
                     throws UdfException {
        ArrayList<String> args =
                          srvInterface.getParamReader().getParamNames();

        // Ensure user supplied two arguments
        if (!(args.contains("from_char") && args.contains("to_char"))) {
            throw new UdfException(0, "You must supply two parameters" +
                        " to ReplaceChar: 'from_char' and 'to_char'");
        }

        // Verify that the from_char is a single character.
        String fromChar = srvInterface.getParamReader().getString("from_char");
        if (fromChar.length() != 1) {
            String message =  String.format("Replacechar expects a single " +
                "character in the 'from_char' parameter. Got length %d",
                fromChar.length());
            throw new UdfException(0, message);
        }

        // Save the from character in the plan context, to be read by
        // prepare() method.
        planCtxt.getWriter().setString("fromChar",fromChar);

        // Ensure to character parameter is a single characater
        String toChar = srvInterface.getParamReader().getString("to_char");
        if (toChar.length() != 1) {
            String message =  String.format("Replacechar expects a single "
                 + "character in the 'to_char' parameter. Got length %d",
                toChar.length());
            throw new UdfException(0, message);
        }
        // Save the to character in the plan data
        planCtxt.getWriter().setString("toChar",toChar);
    }

    // Called on every host that will filter data. Must instantiate the
    // UDFilter subclass.
    @Override
    public UDFilter prepare(ServerInterface srvInterface, PlanContext planCtxt)
                            throws UdfException {
        // Get data stored in the context by the plan() method.
        String fromChar = planCtxt.getWriter().getString("fromChar");
        String toChar = planCtxt.getWriter().getString("toChar");

        // Instantiate a filter object to perform filtering.
        return new ReplaceCharFilter(fromChar, toChar);
    }

    // Describe the parameters accepted by this filter.
    @Override
    public void getParameterType(ServerInterface srvInterface,
             SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(1, "from_char");
        parameterTypes.addVarchar(1, "to_char");
    }
}

4 - C++ 示例:转换编码

以下示例显示了如何通过将 UTF-16 编码数据转换为 UTF-8 来将文件的编码从一种类型转换为另一种类型。您可以在 SDK 的 /opt/vertica/sdk/examples/FilterFunctions/IConverter.cpp 目录中找到此示例。

筛选器实施

class Iconverter : public UDFilter{
private:
    std::string fromEncoding, toEncoding;
    iconv_t cd; // the conversion descriptor opened
    uint converted; // how many characters have been converted
protected:
    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input,
                                InputState input_state, DataBuffer &output)
    {
        char *input_buf = (char *)input.buf + input.offset;
        char *output_buf = (char *)output.buf + output.offset;
        size_t inBytesLeft = input.size - input.offset, outBytesLeft = output.size - output.offset;
        // end of input
        if (input_state == END_OF_FILE && inBytesLeft == 0)
        {
            // Gnu libc iconv doc says, it is good practice to finalize the
            // outbuffer for stateful encodings (by calling with null inbuffer).
            //
            // http://www.gnu.org/software/libc/manual/html_node/Generic-Conversion-Interface.html
            iconv(cd, NULL, NULL, &output_buf, &outBytesLeft);
            // output buffer can be updated by this operation
            output.offset = output.size - outBytesLeft;
            return DONE;
        }
        size_t ret = iconv(cd, &input_buf, &inBytesLeft, &output_buf, &outBytesLeft);
        // if conversion is successful, we ask for more input, as input has not reached EOF.
        StreamState retStatus = INPUT_NEEDED;
        if (ret == (size_t)(-1))
        {
            // seen an error
            switch (errno)
            {
            case E2BIG:
                // input size too big, not a problem, ask for more output.
                retStatus = OUTPUT_NEEDED;
                break;
            case EINVAL:
                // input stops in the middle of a byte sequence, not a problem, ask for more input
                retStatus = input_state == END_OF_FILE ? DONE : INPUT_NEEDED;
                break;
            case EILSEQ:
                // invalid sequence seen, throw
                // TODO: reporting the wrong byte position
                vt_report_error(1, "Invalid byte sequence when doing %u-th conversion", converted);
            case EBADF:
                // something wrong with descriptor, throw
                vt_report_error(0, "Invalid descriptor");
            default:
                vt_report_error(0, "Uncommon Error");
                break;
            }
        }
        else converted += ret;
        // move position pointer
        input.offset = input.size - inBytesLeft;
        output.offset = output.size - outBytesLeft;
        return retStatus;
    }
public:
    Iconverter(const std::string &from, const std::string &to)
    : fromEncoding(from), toEncoding(to), converted(0)
    {
        // note "to encoding" is first argument to iconv...
        cd = iconv_open(to.c_str(), from.c_str());
        if (cd == (iconv_t)(-1))
        {
            // error when creating converters.
            vt_report_error(0, "Error initializing iconv: %m");
        }
    }
    ~Iconverter()
    {
        // free iconv resources;
        iconv_close(cd);
    }
};

工厂实施

class IconverterFactory : public FilterFactory{
public:
    virtual void plan(ServerInterface &srvInterface,
            PlanContext &planCtxt) {
        std::vector<std::string> args = srvInterface.getParamReader().getParamNames();
        /* Check parameters */
        if (!(args.size() == 0 ||
                (args.size() == 1 && find(args.begin(), args.end(), "from_encoding")
                        != args.end()) || (args.size() == 2
                        && find(args.begin(), args.end(), "from_encoding") != args.end()
                        && find(args.begin(), args.end(), "to_encoding") != args.end()))) {
            vt_report_error(0, "Invalid arguments.  Must specify either no arguments,  or "
                               "'from_encoding' alone, or 'from_encoding' and 'to_encoding'.");
        }
        /* Populate planData */
        // By default, we do UTF16->UTF8, and x->UTF8
        VString from_encoding = planCtxt.getWriter().getStringRef("from_encoding");
        VString to_encoding = planCtxt.getWriter().getStringRef("to_encoding");
        from_encoding.copy("UTF-16");
        to_encoding.copy("UTF-8");
        if (args.size() == 2)
        {
            from_encoding.copy(srvInterface.getParamReader().getStringRef("from_encoding"));
            to_encoding.copy(srvInterface.getParamReader().getStringRef("to_encoding"));
        }
        else if (args.size() == 1)
        {
            from_encoding.copy(srvInterface.getParamReader().getStringRef("from_encoding"));
        }
        if (!from_encoding.length()) {
            vt_report_error(0, "The empty string is not a valid from_encoding value");
        }
        if (!to_encoding.length()) {
            vt_report_error(0, "The empty string is not a valid to_encoding value");
        }
    }
    virtual UDFilter* prepare(ServerInterface &srvInterface,
            PlanContext &planCtxt) {
        return vt_createFuncObj(srvInterface.allocator, Iconverter,
                planCtxt.getReader().getStringRef("from_encoding").str(),
                planCtxt.getReader().getStringRef("to_encoding").str());
    }
    virtual void getParameterType(ServerInterface &srvInterface,
                                  SizedColumnTypes &parameterTypes) {
        parameterTypes.addVarchar(32, "from_encoding");
        parameterTypes.addVarchar(32, "to_encoding");
    }
};
RegisterFactory(IconverterFactory);