这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

用户定义的加载 (UDL)

COPY 提供了用于控制如何加载数据的大量选项和设置。但是,您可能发现这些选项不适合您要执行的数据加载类型。使用用户定义的加载 (UDL) 功能,您可以开发一个或多个用于更改 COPY 语句的工作方式的函数。您可以使用 Vertica SDK 来创建用于处理加载过程中的各个步骤的自定义库。。

您可以在开发期间使用以下三种类型的 UDL 函数,每种类型适用于数据加载过程的每个阶段:

  • 用户自定义的源 (UDSource):控制 COPY 如何获取要加载到数据库中的数据。例如,COPY 获取数据的方法可能是通过 HTTP 或 cURL 提取数据。最多只能有一个 UDSource 从文件或输入流读取数据。UDSource 可以从多个源读取数据,但 COPY 只能调用一个 UDSource。

    API 支持:C++、Java。

  • 用户自定义的筛选器 (UDFilter):预处理数据。例如,过滤器可以解压缩文件或将 UTF-16 转换为 UTF-8。可以将多个用户定义的筛选器链接到一起,例如,先解压缩再转换。

    API 支持:C++、Java、Python。

  • 用户自定义的解析器 (UDParser):最多只能有一个解析器将数据解析为可供插入到表中的元组。例如,解析器可以从类似于 XML 的格式提取数据。您可以选择定义用户定义的块分割器(UDChunker,仅限 C++),以让解析器执行并行解析。

    API 支持:C++、Java、Python。

完成最后一步之后,COPY 会将数据插入到表中,或者在格式不正确时拒绝该数据。

1 - 用户定义的源

使用用户定义的源,您可以使用未内置在 Vertica 中的方法来处理数据源。例如,您可以编写用户定义的源,以使用 cURL 访问来自 HTTP 源的数据。虽然给定的 COPY 语句只能指定一个用户定义的源语句,但源函数本身可以从多个源中拉取数据。

UDSource 类将从外部源获取数据。此类可从输入流读取数据,并生成输出流以进行过滤和解析。如果实施 UDSource,则还必须实施相应的 SourceFactory

1.1 - UDSource 类

如果需要从 COPY 已不支持的源类型加载数据,您可以将 UDSource 类子类化。

UDSource 子类的每个实例从单个数据源读取数据。例如,单个数据源可以是单个文件或对 RESTful Web 应用程序执行的单个函数调用的结果。

UDSource 方法

您的 UDSource 子类必须覆盖 process()processWithMetadata()

  • process() 将原始输入流作为一个大文件读取。如果有任何错误或故障,整个加载将失败。

  • processWithMetadata() 当数据源具有以某种结构化格式(与数据有效负载分开)提供的关于记录边界的元数据时很有用。使用此接口,源除了发出数据之外,还会为每个记录发出记录长度。

    通过在每个阶段实施 processWithMetadata() 而不是 process(),您可以在整个加载堆栈中保留此记录长度元数据,从而实施更有效的解析,以在每个消息的基础上而不是每个文件或每个源的基础上从错误中恢复。 KafkaSource 和 Kafka 解析器(KafkaAvroParserKafkaJSONParserKafkaParser)会在各条 Kafka 消息无法解析时使用这种机制为拒绝每条 Kafka 消息提供支持。

此外,还可以覆盖其他 UDSource 类方法。

源执行

以下各节详细说明了每次调用用户定义的源时的执行序列。以下示例覆盖 process() 方法。

设置
COPY 在第一次调用 process() 之前调用了 setup()。使用 setup() 可执行访问数据源所需的任何设置步骤。此方法可以建立网络连接和打开文件,还可以执行类似的必需任务以使 UDSource 能够从数据源读取数据。您的对象可能在使用期间已损坏并重新创建,因此请确保您的对象可以重新启动。

处理源
COPY 将在查询执行期间重复调用 process(),以读取数据并将数据写入到作为参数传递的 DataBuffer。然后,此缓冲区会传递到第一个过滤器。

如果源已用完输入或已填满输出缓冲区,则它必须返回值 StreamState.OUTPUT_NEEDED。如果 Vertica 收到此返回值,它将再次调用该方法。此第二次调用在数据加载过程的下一个阶段已处理输出缓冲区之后进行。如果返回 StreamState.DONE,则指示已读取源中的所有数据。

用户可以取消加载操作,这样会中止读取。

分解
COPY 将在最后一次调用 process() 之后调用 destroy()。此方法可释放由 setup()process() 方法预留的任何资源,例如,setup() 方法分配的文件句柄或网络连接。

取值函数

一个源可以定义两个取值函数,即 getSize()getUri()

调用 process() 之前,COPY 可能会调用 getSize() 以估算要读取的数据的字节数。此值只是一个估算值,用于在 LOAD_STREAMS 表中指示文件大小。由于 Vertica 会在调用 setup() 之前调用该方法,因此 getSize() 不得依赖由 setup() 分配的任何资源。

此方法不应使任何资源处于打开状态。例如,请勿保存由 getSize() 打开以供 process() 方法使用的任何文件句柄,否则会耗尽可用资源,因为 Vertica 将在加载任何数据之前会对 UDSource 子类的所有实例调用 getSize()。如果正在打开许多数据源,这些打开的文件句柄可能会用完系统提供的文件句柄,从而导致没有任何其他文件句柄可用于执行实际的数据加载。

Vertica 将在执行期间调用 getUri(),以更新有关当前正在加载哪些资源的状态信息。它会返回由此 UDSource 读取的数据源的 URI。

API

UDSource API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &output, LengthBuffer &output_lengths)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface);

virtual vint getSize();

virtual std::string getUri();

UDSource API 提供了以下通过子类扩展的方法:

public void setup(ServerInterface srvInterface) throws UdfException;

public abstract StreamState process(ServerInterface srvInterface, DataBuffer output) throws UdfException;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface) throws UdfException;

public Integer getSize();

public String getUri();

1.2 - SourceFactory 类

如果编写源,您还必须编写源工厂。SourceFactory 类的子类负责执行下列任务:

  • 对传递到 UDSource 的参数执行初始验证。

  • 设置 UDSource 实例执行其工作所需的任何数据结构。此信息可能包括有关哪些节点将读取哪个数据源的记录。

  • 为函数在每个主机上读取的每个数据源(或其一部分)创建 UDSource 子类的一个实例。

最简单的源工厂将为每个执行程序节点的每个数据源创建一个 UDSource 实例。您还可以在每个节点上使用多个并发 UDSource 实例。此行为称为并发加载。为了支持这两个选项,SourceFactory 拥有可创建源的方法的两个版本。您必须准确实施其中一个版本。

源工厂是单例。您的子类必须为无状态子类,没有包含数据的字段。该子类还不得修改任何全局变量。

SourceFactory 方法

SourceFactory 类定义了多个方法。您的类必须覆盖 prepareUDSources();它可以覆盖其他方法。

设置

Vertica 将在启动程序节点上调用 plan() 一次,以执行下列任务:

  • 检查用户已向 COPY 语句中的函数调用提供的参数,并提供错误消息(如果出现任何问题)。您可以通过从传递到 plan() 方法的 ServerInterface 的实例获取 ParamReader 对象来读取参数。

  • 确定群集中的哪些主机将读取该数据源。如何拆分工作取决于函数将读取的源。某些源可以跨多个主机进行拆分,例如从多个 URL 读取数据的源。其他源(例如主机文件系统上的单个本地文件)只能由单个特定主机读取。

    可以通过对 NodeSpecifyingPlanContext 对象调用 setTargetNodes() 方法来存储要读取数据源的主机的列表。此对象会传递到 plan() 方法中。

  • 存储各个主机所需的任何信息,以便处理传递到 plan() 方法的 NodeSpecifyingPlanContext 实例中的数据源。例如,您可以存储分配,用于告知每个主机要处理的数据源。plan() 方法仅在启动程序节点上运行,而 prepareUDSources() 方法则在每个读取数据源的主机上运行。因此,该对象是它们之间的唯一通信方式。

    您通过从 getWriter() 方法中获取 ParamWriter 对象在 NodeSpecifyingPlanContext 中存储数据。然后,通过对 ParamWriter 调用方法(比如 setString())写入参数。

创建源

Vertica 会在 plan() 方法已选择将数据加载到的所有主机上调用 prepareUDSources()。此调用会实例化并返回 UDSource 子类实例的列表。如果不使用并发加载,请为分配给主机进行处理的每个源返回一个 UDSource。如果要使用并发加载,请使用将 ExecutorPlanContext 作为参数的方法版本,并尽可能多地返回可供使用的源。您的工厂必须准确实施这些方法之一。

对于并发加载,您可以通过在传入的 ExecutorPlanContext 上调用 getLoadConcurrency() 来了解节点上可供运行 UDSource 实例的线程数。

定义参数

实施 getParameterTypes() 可定义源所使用的参数的名称和类型。Vertica 使用此信息对调用人员发出有关未知或缺失参数的警告。Vertica 会忽略未知参数并为缺失参数使用默认值。当您需要为函数定义类型和参数时,您不需要覆盖此方法。

请求并发加载的线程

当源工厂在执行程序节点上创建源时,默认情况下,它会为每个源创建一个线程。如果您的源可以使用多个线程,请实施 getDesiredThreads()。在调用 prepareUDSources() 之前,Vertica 会先调用此方法,因此您也可以使用它来决定要创建的源数量。返回工厂可用于源的线程数。已传入可用线程的最大数量,因此您可以将其考虑在内。方法返回的值只是一种提示,而不是保证;每个执行程序节点均可确定要分配的线程数。FilePortionSourceFactory 示例将实施此方法;请参阅 C++ 示例:并发加载

您可以允许源控制并行度,这意味着它可以通过实施 isSourceApportionable() 将单个输入拆分成多个加载流。即使此方法返回 true,也不保证源分摊该加载。然而,返回 false 表示解析器不会尝试这么做。有关详细信息,请参阅分摊加载

通常,实施 getDesiredThreads()SourceFactory 也使用分摊加载。但是,使用分摊加载不是必需的。例如,从 Kafka 流中读取的源可以使用多个线程而无需分摊。

API

SourceFactory API 提供了以下通过子类扩展的方法:

virtual void plan(ServerInterface &srvInterface, NodeSpecifyingPlanContext &planCtxt);

// must implement exactly one of prepareUDSources() or prepareUDSourcesExecutor()
virtual std::vector< UDSource * > prepareUDSources(ServerInterface &srvInterface,
            NodeSpecifyingPlanContext &planCtxt);

virtual std::vector< UDSource * > prepareUDSourcesExecutor(ServerInterface &srvInterface,
            ExecutorPlanContext &planCtxt);

virtual void getParameterType(ServerInterface &srvInterface,
            SizedColumnTypes &parameterTypes);

virtual bool isSourceApportionable();

ssize_t getDesiredThreads(ServerInterface &srvInterface,
            ExecutorPlanContext &planContext);

创建 SourceFactory 之后,您必须将其注册到 RegisterFactory 宏。

SourceFactory API 提供了以下通过子类扩展的方法:

public void plan(ServerInterface srvInterface, NodeSpecifyingPlanContext planCtxt)
    throws UdfException;

// must implement one overload of prepareUDSources()
public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface,
                NodeSpecifyingPlanContext planCtxt)
    throws UdfException;

public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface,
                ExecutorPlanContext planCtxt)
    throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

public boolean isSourceApportionable();

public int getDesiredThreads(ServerInterface srvInterface,
                ExecutorPlanContext planCtxt)
    throws UdfException;

1.3 - C++ 示例: CurlSource

使用 CurlSource 示例,您可以使用 cURL 通过 HTTP 打开和读入文件。该示例作为以下内容的一部分提供:/opt/vertica/sdk/examples/SourceFunctions/cURL.cpp

源代码实施

此示例使用位于 /opt/vertica/sdk/examples/HelperLibraries/ 中的 helper 库。

CurlSource 按区块加载数据。如果解析器遇到 EndOfFile 标记,则 process() 方法将返回 DONE。否则,该方法将返回 OUTPUT_NEEDED 并处理其他数据区块。helper 库中包含的函数(例如 url_fread()url_fopen())基于随 libcurl 库附带提供的示例。有关示例,请访问 http://curl.haxx.se/libcurl/c/fopen.html

setup() 函数可打开文件句柄,而 destroy() 函数可关闭文件句柄。它们都使用 helper 库中的函数。

class CurlSource : public UDSource {private:
    URL_FILE *handle;
    std::string url;
    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output) {
        output.offset = url_fread(output.buf, 1, output.size, handle);
        return url_feof(handle) ? DONE : OUTPUT_NEEDED;
    }
public:
    CurlSource(std::string url) : url(url) {}
    void setup(ServerInterface &srvInterface) {
        handle = url_fopen(url.c_str(),"r");
    }
    void destroy(ServerInterface &srvInterface) {
        url_fclose(handle);
    }
};

工厂实施

CurlSourceFactory 可生成 CurlSource 实例。

class CurlSourceFactory : public SourceFactory {public:
    virtual void plan(ServerInterface &srvInterface,
            NodeSpecifyingPlanContext &planCtxt) {
        std::vector<std::string> args = srvInterface.getParamReader().getParamNames();
       /* Check parameters */
        if (args.size() != 1 || find(args.begin(), args.end(), "url") == args.end()) {
            vt_report_error(0, "You must provide a single URL.");
        }
        /* Populate planData */
        planCtxt.getWriter().getStringRef("url").copy(
                                    srvInterface.getParamReader().getStringRef("url"));

        /* Assign Nodes */
        std::vector<std::string> executionNodes = planCtxt.getClusterNodes();
        while (executionNodes.size() > 1) executionNodes.pop_back();
        // Only run on the first node in the list.
        planCtxt.setTargetNodes(executionNodes);
    }
    virtual std::vector<UDSource*> prepareUDSources(ServerInterface &srvInterface,
            NodeSpecifyingPlanContext &planCtxt) {
        std::vector<UDSource*> retVal;
        retVal.push_back(vt_createFuncObj(srvInterface.allocator, CurlSource,
                planCtxt.getReader().getStringRef("url").str()));
        return retVal;
    }
    virtual void getParameterType(ServerInterface &srvInterface,
                                  SizedColumnTypes &parameterTypes) {
        parameterTypes.addVarchar(65000, "url");
    }
};
RegisterFactory(CurlSourceFactory);

1.4 - C++ 示例:并发加载

FilePortionSource 示例演示了如何使用并发加载。此示例是对 FileSource 示例进行的改进。每个输入文件都会拆分成多个部分并分发到 FilePortionSource 实例。源接受将输入分成多个部分所依据的偏移量列表;如果未提供偏移量,源将动态拆分输入。

并发加载将在工厂中进行处理,所以本文讨论的重点是 FilePortionSourceFactory。该示例的完整代码位于 /opt/vertica/sdk/examples/ApportionLoadFunctions 中。该分发还包括此示例的 Java 版本。

加载和使用示例

按如下所示加载并使用 FilePortionSource 示例。

=> CREATE LIBRARY FilePortionLib AS '/home/dbadmin/FP.so';

=> CREATE SOURCE FilePortionSource AS LANGUAGE 'C++'
-> NAME 'FilePortionSourceFactory' LIBRARY FilePortionLib;

=> COPY t WITH SOURCE FilePortionSource(file='g1/*.dat', nodes='initiator,e0,e1', offsets = '0,380000,820000');

=> COPY t WITH SOURCE FilePortionSource(file='g2/*.dat', nodes='e0,e1,e2', local_min_portion_size = 2097152);

实施

并发加载会在两个位置影响源工厂:getDesiredThreads()prepareUDSourcesExecutor()

getDesiredThreads()

getDesiredThreads() 成员函数可确定要请求的线程数。Vertica 会在调用 prepareUDSourcesExecutor() 之前在每个执行程序节点上调用此成员函数。

该函数会先开始将输入文件路径(可能为 glob)分成多个单独的路径。本讨论省略了这些细节。如果未使用分摊加载,则该函数为每个文件分配一个源。

virtual ssize_t getDesiredThreads(ServerInterface &srvInterface,
    ExecutorPlanContext &planCtxt) {
  const std::string filename = srvInterface.getParamReader().getStringRef("file").str();

  std::vector<std::string> paths;
  // expand the glob - at least one thread per source.
  ...

  // figure out how to assign files to sources
  const std::string nodeName = srvInterface.getCurrentNodeName();
  const size_t nodeId = planCtxt.getWriter().getIntRef(nodeName);
  const size_t numNodes = planCtxt.getTargetNodes().size();

  if (!planCtxt.canApportionSource()) {
    /* no apportioning, so the number of files is the final number of sources */
    std::vector<std::string> *expanded =
        vt_createFuncObject<std::vector<std::string> >(srvInterface.allocator, paths);
    /* save expanded paths so we don't have to compute expansion again */
    planCtxt.getWriter().setPointer("expanded", expanded);
    return expanded->size();
  }

  // ...

如果可以分摊源,则 getDesiredThreads() 会使用作为实参传递给工厂的偏移量,以将文件拆分成多个部分。然后,它会将这些部分分配给可用节点。此函数实际上不会直接分配源;完成这项工作是为了确定要请求的线程数。

  else if (srvInterface.getParamReader().containsParameter("offsets")) {

    // if the offsets are specified, then we will have a fixed number of portions per file.
    // Round-robin assign offsets to nodes.
    // ...

    /* Construct the portions that this node will actually handle.
     * This isn't changing (since the offset assignments are fixed),
     * so we'll build the Portion objects now and make them available
     * to prepareUDSourcesExecutor() by putting them in the ExecutorContext.
     *
     * We don't know the size of the last portion, since it depends on the file
     * size.  Rather than figure it out here we will indicate it with -1 and
     * defer that to prepareUDSourcesExecutor().
     */
    std::vector<Portion> *portions =
        vt_createFuncObject<std::vector<Portion>>(srvInterface.allocator);

    for (std::vector<size_t>::const_iterator offset = offsets.begin();
            offset != offsets.end(); ++offset) {
        Portion p(*offset);
        p.is_first_portion = (offset == offsets.begin());
        p.size = (offset + 1 == offsets.end() ? -1 : (*(offset + 1) - *offset));

        if ((offset - offsets.begin()) % numNodes == nodeId) {
            portions->push_back(p);
            srvInterface.log("FilePortionSource: assigning portion %ld: [offset = %lld, size = %lld]",
                    offset - offsets.begin(), p.offset, p.size);
        }
      }

该函数现在具有所有部分,因此可以知道部分的数量:

      planCtxt.getWriter().setPointer("portions", portions);

      /* total number of threads we want is the number of portions per file, which is fixed */
      return portions->size() * expanded->size();
    } // end of "offsets" parameter

如果未提供偏移量,则该函数会将文件动态拆分成多个部分,每个线程一个部分。本讨论省略了此计算过程的细节。请求的线程数多于可用线程数没有意义,因此该函数会对用 PlanContext(函数的实参)调用 getMaxAllowedThreads() 来设置上限:

  if (portions->size() >= planCtxt.getMaxAllowedThreads()) {
    return paths.size();
  }

有关此函数如何将文件拆分成多个部分的详细信息,请参阅完整示例。

此函数使用 vt_createFuncObject 模板来创建对象。Vertica 会调用使用此宏创建的返回对象的析构函数,但它不会调用其他对象(如向量)的析构函数。您必须自己调用这些析构函数以避免内存泄漏。在此示例中,这些调用是在 prepareUDSourcesExecutor() 中进行的。

prepareUDSourcesExecutor()

prepareUDSourcesExecutor() 成员函数(如 getDesiredThreads())包含单独的代码块,具体取决于是否提供了偏移量。在这两种情况下,该函数都会将输入拆分成多个部分并为它们创建 UDSource 实例。

如果使用偏移量调用函数,则 prepareUDSourcesExecutor() 会调用 prepareCustomizedPortions()。此函数如下所示。

/* prepare portions as determined via the "offsets" parameter */
void prepareCustomizedPortions(ServerInterface &srvInterface,
                               ExecutorPlanContext &planCtxt,
                               std::vector<UDSource *> &sources,
                               const std::vector<std::string> &expandedPaths,
                               std::vector<Portion> &portions) {
    for (std::vector<std::string>::const_iterator filename = expandedPaths.begin();
            filename != expandedPaths.end(); ++filename) {
        /*
         * the "portions" vector contains the portions which were generated in
         * "getDesiredThreads"
         */
        const size_t fileSize = getFileSize(*filename);
        for (std::vector<Portion>::const_iterator portion = portions.begin();
                portion != portions.end(); ++portion) {
            Portion fportion(*portion);
            if (fportion.size == -1) {
                /* as described above, this means from the offset to the end */
                fportion.size = fileSize - portion->offset;
                sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
                            *filename, fportion));
            } else if (fportion.size > 0) {
                sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
                            *filename, fportion));
            }
        }
    }
}

如果未使用偏移量调用 prepareUDSourcesExecutor(),则它必须决定要创建的部分数。

基本规则是每个源使用一个部分。但是,如果有额外的线程可用,该函数会将输入拆分成更多部分,以便源可以同时处理它们。然后,prepareUDSourcesExecutor() 会调用 prepareGeneratedPortions() 以创建这些部分。该函数会先开始在计划上下文中调用 getLoadConcurrency() 以找到可用的线程数。

void prepareGeneratedPortions(ServerInterface &srvInterface,
                              ExecutorPlanContext &planCtxt,
                              std::vector<UDSource *> &sources,
                              std::map<std::string, Portion> initialPortions) {

  if ((ssize_t) initialPortions.size() >= planCtxt.getLoadConcurrency()) {
  /* all threads will be used, don't bother splitting into portions */

  for (std::map<std::string, Portion>::const_iterator file = initialPortions.begin();
       file != initialPortions.end(); ++file) {
    sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
            file->first, file->second));
       } // for
    return;
  } // if

  // Now we can split files to take advantage of potentially-unused threads.
  // First sort by size (descending), then we will split the largest first.

  // details elided...

}

有关详细信息

有关此示例的完整实施,请参阅源代码。

1.5 - Java 示例: FileSource

本节中所示的示例是一个名为 FileSource 的简单 UDL 源函数,此函数可加载存储在主机文件系统上的文件中的数据(类似于标准 COPY 语句)。要调用 FileSource,您必须提供一个名为 file 的参数,并且该参数必须包含主机文件系统上的一个或多个文件的绝对路径。您可以使用逗号分隔列表格式指定多个文件。

FileSource 函数还接受一个名为 nodes 的可选参数,此可选参数指示哪些节点应加载文件。如果未提供此参数,则在默认情况下,函数仅在启动程序节点上加载数据。由于此示例是一个简单示例,因此节点仅加载自己的文件系统中的文件。file 参数中的任何文件必须存在于 nodes 参数中的所有主机上。FileSource UDSource 会尝试将 file 参数中的所有文件加载到 nodes 参数中的所有主机上。

生成文件

可以使用以下 Python 脚本生成文件并将这些文件分发给 Vertica 群集中的主机。使用这些文件,您可以试验示例 UDSource 函数。要运行此函数,您必须能够进行无密码 SSH 登录,以将文件复制到其他主机。因此,您必须使用数据库管理员帐户在数据库主机之一上运行该脚本。

#!/usr/bin/python
# Save this file as UDLDataGen.py
import string
import random
import sys
import os

# Read in the dictionary file to provide random words. Assumes the words
# file is located in /usr/share/dict/words
wordFile = open("/usr/share/dict/words")
wordDict = []
for line in wordFile:
    if len(line) > 6:
        wordDict.append(line.strip())

MAXSTR = 4 # Maximum number of words to concatentate
NUMROWS = 1000 # Number of rows of data to generate
#FILEPATH = '/tmp/UDLdata.txt' # Final filename to use for UDL source
TMPFILE = '/tmp/UDLtemp.txt'  # Temporary filename.

# Generate a random string by concatenating several words together. Max
# number of words set by MAXSTR
def randomWords():
    words = [random.choice(wordDict) for n in xrange(random.randint(1, MAXSTR))]
    sentence = " ".join(words)
    return sentence

# Create a temporary data file that will be moved to a node. Number of
# rows for the file is set by NUMROWS. Adds the name of the node which will
# get the file, to show which node loaded the data.
def generateFile(node):
    outFile = open(TMPFILE, 'w')
    for line in xrange(NUMROWS):
        outFile.write('{0}|{1}|{2}\n'.format(line,randomWords(),node))
    outFile.close()

# Copy the temporary file to a node. Only works if passwordless SSH login
# is enabled, which it is for the database administrator account on
# Vertica hosts.
def copyFile(fileName,node):
    os.system('scp "%s" "%s:%s"' % (TMPFILE, node, fileName) )

# Loop through the comma-separated list of nodes given in the first
# parameter, creating and copying data files whose full comma-separated
# paths are passed in the second parameter
for node in [x.strip() for x in sys.argv[1].split(',')]:
    for fileName in [y.strip() for y in sys.argv[2].split(',')]:
        print "generating file", fileName, "for", node
        generateFile(node)
        print "Copying file to",node
        copyFile(fileName,node)

您可以调用该脚本,并为其提供要接收文件的主机的逗号分隔列表和要生成的文件的绝对路径的逗号分隔列表。例如:

python UDLDataGen.py v_vmart_node0001,v_vmart_node0002,v_vmart_node0003 /tmp/UDLdata01.txt,/tmp/UDLdata02.txt,\
UDLdata03.txt

该脚本将生成包含一千个行(各列用管道字符 (|) 分隔)的文件。这些列包含一个索引值、一组随机词以及为其生成了文件的节点,如以下输出示例所示:

0|megabits embanks|v_vmart_node0001
1|unneatly|v_vmart_node0001
2|self-precipitation|v_vmart_node0001
3|antihistamine scalados Vatter|v_vmart_node0001

加载和使用示例

按如下所示加载并使用 FileSource UDSource:

=> --Load library and create the source function
=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaUDlLib.jar'
-> LANGUAGE 'JAVA';
CREATE LIBRARY
=> CREATE SOURCE File as LANGUAGE 'JAVA' NAME
-> 'com.mycompany.UDL.FileSourceFactory' LIBRARY JavaLib;
CREATE SOURCE FUNCTION
=> --Create a table to hold the data loaded from files
=> CREATE TABLE t (i integer, text VARCHAR, node VARCHAR);
CREATE TABLE
=> -- Copy a single file from the currently host using the FileSource
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt');
 Rows Loaded
-------------
        1000
(1 row)

=> --See some of what got loaded.
=> SELECT * FROM t WHERE i < 5 ORDER BY i;
 i |             text              |  node
---+-------------------------------+-----------------
 0 | megabits embanks              | v_vmart_node0001
 1 | unneatly                      | v_vmart_node0001
 2 | self-precipitation            | v_vmart_node0001
 3 | antihistamine scalados Vatter | v_vmart_node0001
 4 | fate-menaced toilworn         | v_vmart_node0001
(5 rows)



=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> -- Now load a file from three hosts. All of these hosts must have a file
=> -- named /tmp/UDLdata01.txt, each with different data
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt',
-> nodes='v_vmart_node0001,v_vmart_node0002,v_vmart_node0003');
 Rows Loaded
-------------
        3000
(1 row)

=> --Now see what has been loaded
=> SELECT * FROM t WHERE i < 5 ORDER BY i,node ;
 i |                      text                       |  node
---+-------------------------------------------------+--------
 0 | megabits embanks                                | v_vmart_node0001
 0 | nimble-eyed undupability frowsier               | v_vmart_node0002
 0 | Circean nonrepellence nonnasality               | v_vmart_node0003
 1 | unneatly                                        | v_vmart_node0001
 1 | floatmaker trabacolos hit-in                    | v_vmart_node0002
 1 | revelrous treatableness Halleck                 | v_vmart_node0003
 2 | self-precipitation                              | v_vmart_node0001
 2 | whipcords archipelagic protodonatan copycutter  | v_vmart_node0002
 2 | Paganalian geochemistry short-shucks            | v_vmart_node0003
 3 | antihistamine scalados Vatter                   | v_vmart_node0001
 3 | swordweed touristical subcommanders desalinized | v_vmart_node0002
 3 | batboys                                         | v_vmart_node0003
 4 | fate-menaced toilworn                           | v_vmart_node0001
 4 | twice-wanted cirrocumulous                      | v_vmart_node0002
 4 | doon-head-clock                                 | v_vmart_node0003
(15 rows)

=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> --Now copy from several files on several hosts
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt,/tmp/UDLdata02.txt,/tmp/UDLdata03.txt'
-> ,nodes='v_vmart_node0001,v_vmart_node0002,v_vmart_node0003');
 Rows Loaded
-------------
        9000
(1 row)

=> SELECT * FROM t WHERE i = 0 ORDER BY node ;
 i |                    text                     |  node
---+---------------------------------------------+--------
 0 | Awolowo Mirabilis D'Amboise                 | v_vmart_node0001
 0 | sortieing Divisionism selfhypnotization     | v_vmart_node0001
 0 | megabits embanks                            | v_vmart_node0001
 0 | nimble-eyed undupability frowsier           | v_vmart_node0002
 0 | thiaminase hieroglypher derogated soilborne | v_vmart_node0002
 0 | aurigraphy crocket stenocranial             | v_vmart_node0002
 0 | Khulna pelmets                              | v_vmart_node0003
 0 | Circean nonrepellence nonnasality           | v_vmart_node0003
 0 | matterate protarsal                         | v_vmart_node0003
(9 rows)

解析器实施

以下代码显示了从主机文件系统读取文件的 FileSource 类的源。FileSourceFactory.prepareUDSources() 所调用的构造函数可获取包含要读取的数据的文件的绝对文件。setup() 方法可打开文件,而 destroy() 方法可关闭文件。process() 方法可读取文件中的数据并将数据写入到缓冲区(由作为参数传递的 DataBuffer 类的实例提供)。如果读取操作已将输出缓冲区填满,则该方法将返回 OUTPUT_NEEDED。此值可指示 Vertica 在加载的下一个阶段已处理输出缓冲区之后再次调用该方法。如果读取操作未将输出缓冲区填满,则 process() 将返回 DONE,以指示已完成对数据源的处理。

package com.mycompany.UDL;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.RandomAccessFile;

import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.UDSource;
import com.vertica.sdk.UdfException;

public class FileSource extends UDSource {

    private String filename;  // The file for this UDSource to read
    private RandomAccessFile reader;   // handle to read from file


    // The constructor just stores the absolute filename of the file it will
    // read.
    public FileSource(String filename) {
        super();
        this.filename = filename;
    }

    // Called before Vertica starts requesting data from the data source.
    // In this case, setup needs to open the file and save to the reader
    // property.
    @Override
    public void setup(ServerInterface srvInterface ) throws UdfException{
        try {
            reader = new RandomAccessFile(new File(filename), "r");
        } catch (FileNotFoundException e) {
            // In case of any error, throw a UDfException. This will terminate
            // the data load.
             String msg = e.getMessage();
             throw new UdfException(0, msg);
        }
    }

    // Called after data has been loaded. In this case, close the file handle.
    @Override
    public void destroy(ServerInterface srvInterface ) throws UdfException {
        if (reader != null) {
            try {
                reader.close();
            } catch (IOException e) {
                String msg = e.getMessage();
                 throw new UdfException(0, msg);
            }
        }
    }

    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer output)
                                throws UdfException {

        // Read up to the size of the buffer provided in the DataBuffer.buf
        // property. Here we read directly from the file handle into the
        // buffer.
        long offset;
        try {
            offset = reader.read(output.buf,output.offset,
                                 output.buf.length-output.offset);
        } catch (IOException e) {
            // Throw an exception in case of any errors.
            String msg = e.getMessage();
            throw new UdfException(0, msg);
        }

        // Update the number of bytes processed so far by the data buffer.
        output.offset +=offset;

        // See end of data source has been reached, or less data was read
        // than can fit in the buffer
        if(offset == -1 || offset < output.buf.length) {
            // No more data to read.
            return StreamState.DONE;
        }else{
            // Tell Vertica to call again when buffer has been emptied
            return StreamState.OUTPUT_NEEDED;
        }
    }
}

工厂实施

以下代码是 Java UDx 支持包中提供的示例 Java UDsource 函数的修改版本。可以在 /opt/vertica/sdk/examples/JavaUDx/UDLFuctions/com/vertica/JavaLibs/FileSourceFactory.java 中找到完整示例。通过覆盖 plan() 方法,可验证用户是否提供了必需的 file 参数。如果用户还提供了可选的 nodes 参数,该方法还可以验证节点是否存在于 Vertica 群集中。如果任一参数存在问题,该方法将抛出异常并向用户返回错误。如果两个参数都没有问题,则 plan() 方法会将其值存储在计划上下文对象中。

package com.mycompany.UDL;

import java.util.ArrayList;
import java.util.Vector;
import com.vertica.sdk.NodeSpecifyingPlanContext;
import com.vertica.sdk.ParamReader;
import com.vertica.sdk.ParamWriter;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.SourceFactory;
import com.vertica.sdk.UDSource;
import com.vertica.sdk.UdfException;

public class FileSourceFactory extends SourceFactory {

    // Called once on the initiator host to do initial setup. Checks
    // parameters and chooses which nodes will do the work.
    @Override
    public void plan(ServerInterface srvInterface,
            NodeSpecifyingPlanContext planCtxt) throws UdfException {

        String nodes; // stores the list of nodes that will load data

        // Get  copy of the parameters the user supplied to the UDSource
        // function call.
        ParamReader args =  srvInterface.getParamReader();

        // A list of nodes that will perform work. This gets saved as part
        // of the plan context.
        ArrayList<String> executionNodes = new ArrayList<String>();

        // First, ensure the user supplied the file parameter
        if (!args.containsParameter("file")) {
            // Withut a file parameter, we cannot continue. Throw an
            // exception that will be caught by the Java UDx framework.
            throw new UdfException(0, "You must supply a file parameter");
        }

        // If the user specified nodes to read the file, parse the
        // comma-separated list and save. Otherwise, assume just the
        // Initiator node has the file to read.
        if (args.containsParameter("nodes")) {
            nodes = args.getString("nodes");

            // Get list of nodes in cluster, to ensure that the node the
            // user specified actually exists. The list of nodes is available
            // from the planCTxt (plan context) object,
            ArrayList<String> clusterNodes = planCtxt.getClusterNodes();

            // Parse the string parameter "nodes" which
            // is a comma-separated list of node names.
            String[] nodeNames = nodes.split(",");

            for (int i = 0; i < nodeNames.length; i++){
                // See if the node the user gave us actually exists
                if(clusterNodes.contains(nodeNames[i]))
                    // Node exists. Add it to list of nodes.
                    executionNodes.add(nodeNames[i]);
                else{
                    // User supplied node that doesn't exist. Throw an
                    // exception so the user is notified.
                    String msg = String.format("Specified node '%s' but no" +
                        " node by that name is available.  Available nodes "
                        + "are \"%s\".",
                        nodeNames[i], clusterNodes.toString());
                    throw new UdfException(0, msg);
                }
            }
        } else {
            // User did not supply a list of node names. Assume the initiator
            // is the only host that will read the file. The srvInterface
            // instance passed to this method has a getter for the current
            // node.
            executionNodes.add(srvInterface.getCurrentNodeName());
        }

        // Set the target node(s) in the plan context
        planCtxt.setTargetNodes(executionNodes);

        // Set parameters for each node reading data that tells it which
        // files it will read. In this simple example, just tell it to
        // read all of the files the user passed in the file parameter
        String files = args.getString("file");

        // Get object to write parameters into the plan context object.
        ParamWriter nodeParams = planCtxt.getWriter();

        // Loop through list of execution nodes, and add a parameter to plan
        // context named for each node performing the work, which tells it the
        // list of files it will process. Each node will look for a
        // parameter named something like "filesForv_vmart_node0002" in its
        // prepareUDSources() method.
        for (int i = 0; i < executionNodes.size(); i++) {
            nodeParams.setString("filesFor" + executionNodes.get(i), files);
        }
    }

    // Called on each host that is reading data from a source. This method
    // returns an array of UDSource objects that process each source.
    @Override
    public ArrayList<UDSource> prepareUDSources(ServerInterface srvInterface,
            NodeSpecifyingPlanContext planCtxt) throws UdfException {

        // An array to hold the UDSource subclasses that we instaniate
        ArrayList<UDSource> retVal = new ArrayList<UDSource>();

        // Get the list of files this node is supposed to process. This was
        // saved by the plan() method in the plancontext
        String myName = srvInterface.getCurrentNodeName();
        ParamReader params = planCtxt.getReader();
        String fileNames = params.getString("filesFor" + myName);

        // Note that you can also be lazy and directly grab the parameters
        // the user passed to the UDSource functon in the COPY statement directly
        // by getting parameters from the ServerInterface object. I.e.:

        //String fileNames = srvInterface.getParamReader().getString("file");

        // Split comma-separated list into a single list.
        String[] fileList = fileNames.split(",");
        for (int i = 0; i < fileList.length; i++){
            // Instantiate a FileSource object (which is a subclass of UDSource)
            // to read each file. The constructor for FileSource takes the
            // file name of the
            retVal.add(new FileSource(fileList[i]));
        }

        // Return the collection of FileSource objects. They will be called,
        // in turn, to read each of the files.
        return retVal;
    }

    // Declares which parameters that this factory accepts.
    @Override
    public void getParameterType(ServerInterface srvInterface,
                                    SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(65000, "file");
        parameterTypes.addVarchar(65000, "nodes");
    }
}

2 - 用户自定义的筛选器

使用用户定义的筛选器,您可以通过多种方式处理从源获取的数据。例如,筛选器可以执行下列操作:

  • 处理其压缩格式不受 Vertica 原生支持的压缩文件。

  • 接收 UTF-16 编码数据并将其转码为 UTF-8 编码。

  • 对数据执行搜索和替换操作,然后再将数据加载到 Vertica 中。

您还可以通过多个筛选器处理数据,然后再将数据加载到 Vertica 中。例如,您可以解压缩使用 GZip 格式压缩的文件,然后将内容从 UTF-16 转换为 UTF-8,最后搜索并替换某些文本字符串。

如果您实施 UDFilter,还必须实施相应的 FilterFactory

有关 API 详细信息,请参阅 UDFilter 类FilterFactory 类

2.1 - UDFilter 类

UDFilter 类负责从源读取原始输入数据,以及使数据准备好加载到 Vertica 或由解析器处理。此准备可能涉及解压缩、重新编码或任何其他类型的二进制处理。

UDFilter 由 Vertica 群集中对数据源执行筛选的每个主机上相应的 FilterFactory 实例化。

UDFilter 方法

您的 UDFilter 子类必须覆盖 process()processWithMetadata()

  • process() 将原始输入流作为一个大文件读取。如果有任何错误或故障,整个加载将失败。
    上游源实施 processWithMetadata() 时可以实施 process(),但可能会导致解析错误。

  • processWithMetadata() 当数据源具有以某种结构化格式(与数据有效负载分开)提供的关于记录边界的元数据时很有用。使用此接口,源除了发出数据之外,还会为每个记录发出记录长度。

    通过在每个阶段实施 processWithMetadata() 而不是 process(),您可以在整个加载堆栈中保留此记录长度元数据,从而实施更有效的解析,以在每个消息的基础上而不是每个文件或每个源的基础上从错误中恢复。 KafkaSource 和 Kafka 解析器(KafkaAvroParserKafkaJSONParserKafkaParser)在单个 Kafka 消息损坏时使用这种机制来支持每个 Kafka 消息的拒绝。

    processWithMetadata()UDFilter 子类一起使用,您可以编写一个内部筛选器,该筛选器将源中的记录长度元数据集成到数据流中,生成带边界信息的单字节流以帮助解析器提取和处理单个消息。KafkaInsertDelimetersKafkaInsertLengths 使用这种机制将消息边界信息插入到 Kafka 数据流中。

或者,您可以覆盖其他 UDFilter 类方法。

筛选器执行

以下部分详细说明了每次调用用户定义的筛选器时的执行序列。以下示例覆盖 process() 方法。

正在设置
COPY 在第一次调用 process() 之前调用了 setup()。使用 setup() 可执行任何必要的设置步骤以使过滤器正常工作,例如,初始化数据结构以在过滤期间使用。您的对象可能在使用期间已损坏并重新创建,因此请确保您的对象可以重新启动。

过滤数据
COPY 将在查询执行期间重复调用 process() 以过滤数据。此方法将在其参数中收到 DataBuffer 类的两个实例以及一个输入缓冲区和一个输出缓冲区。您的实施应从输入缓冲区读取数据,并以某种方式(例如解压缩)处理数据,然后将结果写入到输出缓冲区。您的实施所读取的字节数和所写入的字节数之间可能不存在一对一关联。process() 方法应处理数据,直至没有更多数据可读取或输出缓冲区中的空间已用完。当出现这两种情况之一时,该方法应返回以下由 StreamState 定义的值之一:

  • OUTPUT_NEEDED,如果过滤器的输出缓冲区需要更多空间。

  • INPUT_NEEDED,如果过滤器已用完输入数据(但尚未完全处理数据源)。

  • DONE,如果过滤器已处理数据源中的所有数据。

  • KEEP_GOING,如果过滤器在很长一段时间内无法继续执行操作。系统会再次调用该方法,因此请勿无限期阻止该方法,否则会导致用户无法取消查询。

返回之前,process() 方法必须在每个 DataBuffer 中设置 offset 属性。在输入缓冲区中,请将该属性设置为方法成功读取的字节数。在输出缓冲区中,请将该属性设置为方法已写入的字节数。通过设置这些属性,对 process() 发出的下一次调用可以在缓冲区中的正确位置继续读取和写入数据。

process() 方法还需要检查传递给它的 InputState 对象,以确定数据源中是否存在更多数据。如果此对象等于 END_OF_FILE,则输入数据中剩余的数据是数据源中的最后数据。处理完所有剩余的数据后,process() 必须返回 DONE。

分解
COPY 将在最后一次调用 process() 之后调用 destroy()。此方法可释放由 setup()process() 方法预留的任何资源。Vertica 将在 process() 方法指示已完成对数据流中所有数据的筛选之后调用此方法。

如果仍有数据源尚未处理,Vertica 可能会在稍后再次对该对象调用 setup()。在后续发出调用时,Vertica 会指示该方法筛选新的数据流中的数据。因此,destroy() 方法应使 UDFilter 子类的对象处于某种状态,以便 setup() 方法可为重复使用该对象做好准备。

API

UDFilter API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input, InputState input_state, DataBuffer &output)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &input,
    LengthBuffer &input_lengths, InputState input_state, DataBuffer &output, LengthBuffer &output_lengths)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface);

UDFilter API 提供了以下通过子类扩展的方法:

public void setup(ServerInterface srvInterface) throws UdfException;

public abstract StreamState process(ServerInterface srvInterface, DataBuffer input,
                InputState input_state, DataBuffer output)
    throws UdfException;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface) throws UdfException;

UDFilter API 提供了以下通过子类扩展的方法:

class PyUDFilter(vertica_sdk.UDFilter):
    def __init__(self):
        pass

    def setup(self, srvInterface):
        pass

    def process(self, srvInterface, inputbuffer, outputbuffer, inputstate):
        # User process data here, and put into outputbuffer.
        return StreamState.DONE

2.2 - FilterFactory 类

如果编写过滤器,您还必须编写用于生成过滤器实例的过滤器工厂。为此,请对 FilterFactory 类设置子类。

子类将执行函数执行的初始验证和规划工作,并将每个主机(将进行数据过滤)上的 UDFilter 对象实例化。

过滤器工厂是单例。您的子类必须为无状态子类,没有包含数据的字段。该子类还不得修改任何全局变量。

FilterFactory 方法

FilterFactory 类定义了以下方法。您的子类必须覆盖 prepare() 方法。可以覆盖其他方法。

设置

Vertica 将在启动程序节点上调用 plan() 一次,以执行下列任务:

  • 检查已从 COPY 语句中的函数调用传递的任何参数和错误消息(如果出现任何问题)。您通过从传递至 plan() 方法的 ServerInterface 的实例中获得 ParamReader 对象读取参数。

  • 存储各个主机所需的任何信息,以便过滤作为参数传递的 PlanContext 实例中的数据。例如,您可以存储过滤器将读取的输入格式的详细信息,并输出该过滤器应生成的格式。plan() 方法仅在启动程序节点上运行,而 prepare() 方法则在每个读取数据源的主机上运行。因此,该对象是它们之间的唯一通信方式。

    您通过从 getWriter() 方法中获取 ParamWriter 对象在 PlanContext 中存储数据。然后,通过对 ParamWriter 调用方法(比如 setString)写入参数。

创建筛选器

Vertica 将调用 prepare(),以创建并初始化筛选器。它在将执行过滤的每个节点上调用此方法一次。Vertica 根据可用资源自动选择可完成该工作的最佳节点。您无法指定在哪些节点上完成工作。

定义参数

实施 getParameterTypes() 可定义过滤器所使用的参数的名称和类型。Vertica 使用此信息对调用人员发出有关未知或缺失参数的警告。Vertica 会忽略未知参数并为缺失参数使用默认值。当您需要为函数定义类型和参数时,您不需要覆盖此方法。

API

FilterFactory API 提供了以下通过子类扩展的方法:

virtual void plan(ServerInterface &srvInterface, PlanContext &planCtxt);

virtual UDFilter * prepare(ServerInterface &srvInterface, PlanContext &planCtxt)=0;

virtual void getParameterType(ServerInterface &srvInterface, SizedColumnTypes &parameterTypes);

创建 FilterFactory 之后,您必须将其注册到 RegisterFactory 宏。

FilterFactory API 提供了以下通过子类扩展的方法:

public void plan(ServerInterface srvInterface, PlanContext planCtxt)
    throws UdfException;

public abstract UDFilter prepare(ServerInterface srvInterface, PlanContext planCtxt)
    throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

FilterFactory API 提供了以下通过子类扩展的方法:


class PyFilterFactory(vertica_sdk.SourceFactory):
    def __init__(self):
        pass
    def plan(self):
        pass
    def prepare(self, planContext):
        #User implement the function to create PyUDSources.
        pass

2.3 - Java 示例: ReplaceCharFilter

本节中的示例演示了创建一个 UDFilter,以用于在输出流中将输入流中某个字符的任何实例替换为另一个字符。此示例是高度简化的示例,并假设输入流为 ASCII 数据。

始终应记住的是,UDFilter 中的输入流和输出流实际上是二进制数据。如果要使用 UDFilter 执行字符转换,请将数据流从字节字符串转换为正确编码的字符串。例如,输入流可能包含 UTF-8 编码文本。如果是的话,务必先将要从缓冲区中读取的原始二进制数据转换为 UTF 字符串,然后再进行处理。

加载和使用示例

示例 UDFilter 具有两个必需参数。from_char 参数指定了要替换的字符,而 to_char 参数指定了替换字符。按如下所示加载并使用 ReplaceCharFilter UDFilter:

=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaUDlLib.jar'
->LANGUAGE 'JAVA';
CREATE LIBRARY
=> CREATE FILTER ReplaceCharFilter as LANGUAGE 'JAVA'
->name 'com.mycompany.UDL.ReplaceCharFilterFactory' library JavaLib;
CREATE FILTER FUNCTION
=> CREATE TABLE t (text VARCHAR);
CREATE TABLE
=> COPY t FROM STDIN WITH FILTER ReplaceCharFilter(from_char='a', to_char='z');
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> Mary had a little lamb
>> a man, a plan, a canal, Panama
>> \.

=> SELECT * FROM t;
              text
--------------------------------
 Mzry hzd z little lzmb
 z mzn, z plzn, z cznzl, Pznzmz
(2 rows)

=> --Calling the filter with incorrect parameters returns errors
=> COPY t from stdin with filter ReplaceCharFilter();
ERROR 3399:  Failure in UDx RPC call InvokePlanUDL(): Error in User Defined Object [
ReplaceCharFilter], error code: 0
com.vertica.sdk.UdfException: You must supply two parameters to ReplaceChar: 'from_char' and 'to_char'
        at com.vertica.JavaLibs.ReplaceCharFilterFactory.plan(ReplaceCharFilterFactory.java:22)
        at com.vertica.udxfence.UDxExecContext.planUDFilter(UDxExecContext.java:889)
        at com.vertica.udxfence.UDxExecContext.planCurrentUDLType(UDxExecContext.java:865)
        at com.vertica.udxfence.UDxExecContext.planUDL(UDxExecContext.java:821)
        at com.vertica.udxfence.UDxExecContext.run(UDxExecContext.java:242)
        at java.lang.Thread.run(Thread.java:662)

解析器实施

ReplaceCharFilter 类将读取数据流,并将用户指定字符的每个实例替换为另一个字符。

package com.vertica.JavaLibs;

import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.InputState;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.UDFilter;

public class ReplaceCharFilter extends UDFilter {

    private byte[] fromChar;
    private byte[] toChar;

    public ReplaceCharFilter(String fromChar, String toChar){
        // Stores the from char and to char as byte arrays. This is
        // not a robust method of doing this, but works for this simple
        // example.
        this.fromChar= fromChar.getBytes();
        this.toChar=toChar.getBytes();
    }
    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer input,
            InputState input_state, DataBuffer output) {

        // Check if there is no more input and the input buffer has been completely
        // processed. If so, filtering is done.
        if (input_state == InputState.END_OF_FILE && input.buf.length == 0) {
            return StreamState.DONE;
        }

        // Get current position in the input buffer
        int offset = output.offset;

        // Determine how many bytes to process. This is either until input
        // buffer is exhausted or output buffer is filled
        int limit = Math.min((input.buf.length - input.offset),
                (output.buf.length - output.offset));

        for (int i = input.offset; i < limit; i++) {
            // This example just replaces each instance of from_char
            // with to_char. It does not consider things such as multi-byte
            // UTF-8 characters.
            if (input.buf[i] == fromChar[0]) {
                output.buf[i+offset] = toChar[0];
            } else {
                // Did not find from_char, so copy input to the output
                output.buf[i+offset]=input.buf[i];
            }
        }

        input.offset += limit;
        output.offset += input.offset;

        if (input.buf.length - input.offset < output.buf.length - output.offset) {
            return StreamState.INPUT_NEEDED;
        } else {
            return StreamState.OUTPUT_NEEDED;
        }
    }
}

工厂实施

ReplaceCharFilterFactory 需要两个参数(from_charto_char)。plan() 方法可验证这些参数是否存在以及是否为单字符字符串。然后,此方法会将它们存储在计划上下文中。prepare() 方法可获取参数值并将参数值传递到 ReplaceCharFilter 对象,然后将这些对话实例化以执行过滤。

package com.vertica.JavaLibs;

import java.util.ArrayList;
import java.util.Vector;

import com.vertica.sdk.FilterFactory;
import com.vertica.sdk.PlanContext;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.UDFilter;
import com.vertica.sdk.UdfException;

public class ReplaceCharFilterFactory extends FilterFactory {

    // Run on the initiator node to perform varification and basic setup.
    @Override
    public void plan(ServerInterface srvInterface,PlanContext planCtxt)
                     throws UdfException {
        ArrayList<String> args =
                          srvInterface.getParamReader().getParamNames();

        // Ensure user supplied two arguments
        if (!(args.contains("from_char") && args.contains("to_char"))) {
            throw new UdfException(0, "You must supply two parameters" +
                        " to ReplaceChar: 'from_char' and 'to_char'");
        }

        // Verify that the from_char is a single character.
        String fromChar = srvInterface.getParamReader().getString("from_char");
        if (fromChar.length() != 1) {
            String message =  String.format("Replacechar expects a single " +
                "character in the 'from_char' parameter. Got length %d",
                fromChar.length());
            throw new UdfException(0, message);
        }

        // Save the from character in the plan context, to be read by
        // prepare() method.
        planCtxt.getWriter().setString("fromChar",fromChar);

        // Ensure to character parameter is a single characater
        String toChar = srvInterface.getParamReader().getString("to_char");
        if (toChar.length() != 1) {
            String message =  String.format("Replacechar expects a single "
                 + "character in the 'to_char' parameter. Got length %d",
                toChar.length());
            throw new UdfException(0, message);
        }
        // Save the to character in the plan data
        planCtxt.getWriter().setString("toChar",toChar);
    }

    // Called on every host that will filter data. Must instantiate the
    // UDFilter subclass.
    @Override
    public UDFilter prepare(ServerInterface srvInterface, PlanContext planCtxt)
                            throws UdfException {
        // Get data stored in the context by the plan() method.
        String fromChar = planCtxt.getWriter().getString("fromChar");
        String toChar = planCtxt.getWriter().getString("toChar");

        // Instantiate a filter object to perform filtering.
        return new ReplaceCharFilter(fromChar, toChar);
    }

    // Describe the parameters accepted by this filter.
    @Override
    public void getParameterType(ServerInterface srvInterface,
             SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(1, "from_char");
        parameterTypes.addVarchar(1, "to_char");
    }
}

2.4 - C++ 示例:转换编码

以下示例显示了如何通过将 UTF-16 编码数据转换为 UTF-8 来将文件的编码从一种类型转换为另一种类型。您可以在 SDK 的 /opt/vertica/sdk/examples/FilterFunctions/IConverter.cpp 目录中找到此示例。

筛选器实施

class Iconverter : public UDFilter{
private:
    std::string fromEncoding, toEncoding;
    iconv_t cd; // the conversion descriptor opened
    uint converted; // how many characters have been converted
protected:
    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input,
                                InputState input_state, DataBuffer &output)
    {
        char *input_buf = (char *)input.buf + input.offset;
        char *output_buf = (char *)output.buf + output.offset;
        size_t inBytesLeft = input.size - input.offset, outBytesLeft = output.size - output.offset;
        // end of input
        if (input_state == END_OF_FILE && inBytesLeft == 0)
        {
            // Gnu libc iconv doc says, it is good practice to finalize the
            // outbuffer for stateful encodings (by calling with null inbuffer).
            //
            // http://www.gnu.org/software/libc/manual/html_node/Generic-Conversion-Interface.html
            iconv(cd, NULL, NULL, &output_buf, &outBytesLeft);
            // output buffer can be updated by this operation
            output.offset = output.size - outBytesLeft;
            return DONE;
        }
        size_t ret = iconv(cd, &input_buf, &inBytesLeft, &output_buf, &outBytesLeft);
        // if conversion is successful, we ask for more input, as input has not reached EOF.
        StreamState retStatus = INPUT_NEEDED;
        if (ret == (size_t)(-1))
        {
            // seen an error
            switch (errno)
            {
            case E2BIG:
                // input size too big, not a problem, ask for more output.
                retStatus = OUTPUT_NEEDED;
                break;
            case EINVAL:
                // input stops in the middle of a byte sequence, not a problem, ask for more input
                retStatus = input_state == END_OF_FILE ? DONE : INPUT_NEEDED;
                break;
            case EILSEQ:
                // invalid sequence seen, throw
                // TODO: reporting the wrong byte position
                vt_report_error(1, "Invalid byte sequence when doing %u-th conversion", converted);
            case EBADF:
                // something wrong with descriptor, throw
                vt_report_error(0, "Invalid descriptor");
            default:
                vt_report_error(0, "Uncommon Error");
                break;
            }
        }
        else converted += ret;
        // move position pointer
        input.offset = input.size - inBytesLeft;
        output.offset = output.size - outBytesLeft;
        return retStatus;
    }
public:
    Iconverter(const std::string &from, const std::string &to)
    : fromEncoding(from), toEncoding(to), converted(0)
    {
        // note "to encoding" is first argument to iconv...
        cd = iconv_open(to.c_str(), from.c_str());
        if (cd == (iconv_t)(-1))
        {
            // error when creating converters.
            vt_report_error(0, "Error initializing iconv: %m");
        }
    }
    ~Iconverter()
    {
        // free iconv resources;
        iconv_close(cd);
    }
};

工厂实施

class IconverterFactory : public FilterFactory{
public:
    virtual void plan(ServerInterface &srvInterface,
            PlanContext &planCtxt) {
        std::vector<std::string> args = srvInterface.getParamReader().getParamNames();
        /* Check parameters */
        if (!(args.size() == 0 ||
                (args.size() == 1 && find(args.begin(), args.end(), "from_encoding")
                        != args.end()) || (args.size() == 2
                        && find(args.begin(), args.end(), "from_encoding") != args.end()
                        && find(args.begin(), args.end(), "to_encoding") != args.end()))) {
            vt_report_error(0, "Invalid arguments.  Must specify either no arguments,  or "
                               "'from_encoding' alone, or 'from_encoding' and 'to_encoding'.");
        }
        /* Populate planData */
        // By default, we do UTF16->UTF8, and x->UTF8
        VString from_encoding = planCtxt.getWriter().getStringRef("from_encoding");
        VString to_encoding = planCtxt.getWriter().getStringRef("to_encoding");
        from_encoding.copy("UTF-16");
        to_encoding.copy("UTF-8");
        if (args.size() == 2)
        {
            from_encoding.copy(srvInterface.getParamReader().getStringRef("from_encoding"));
            to_encoding.copy(srvInterface.getParamReader().getStringRef("to_encoding"));
        }
        else if (args.size() == 1)
        {
            from_encoding.copy(srvInterface.getParamReader().getStringRef("from_encoding"));
        }
        if (!from_encoding.length()) {
            vt_report_error(0, "The empty string is not a valid from_encoding value");
        }
        if (!to_encoding.length()) {
            vt_report_error(0, "The empty string is not a valid to_encoding value");
        }
    }
    virtual UDFilter* prepare(ServerInterface &srvInterface,
            PlanContext &planCtxt) {
        return vt_createFuncObj(srvInterface.allocator, Iconverter,
                planCtxt.getReader().getStringRef("from_encoding").str(),
                planCtxt.getReader().getStringRef("to_encoding").str());
    }
    virtual void getParameterType(ServerInterface &srvInterface,
                                  SizedColumnTypes &parameterTypes) {
        parameterTypes.addVarchar(32, "from_encoding");
        parameterTypes.addVarchar(32, "to_encoding");
    }
};
RegisterFactory(IconverterFactory);

3 - 用户定义的解析器

解析器将接收字节流,并将相应的元组序列传递到 Vertica 加载进程。您可以使用用户定义的解析器函数解析以下数据:

  • Vertica 内置解析器无法理解其格式的数据。

  • 所需的控制比内置解析器提供的控制更精确的数据。

例如,您可以使用特定的 CSV 库加载 CSV 文件。请参阅 Vertica SDK 查看两个 CSV 示例。

COPY 支持单个用户定义的解析器,您可以将其与用户定义的源以及零个或多个用户定义的筛选器实例一起使用。如果您实施 UDParser 类,还必须实施相应的 ParserFactory

有时可以通过添加块分割器来提高解析器的性能。块分割器可拆分输入并使用多个线程来解析输入。块分割器仅在 C++ API 中可用。有关详细信息,请参阅“协作解析”和“UDChunker 类”。在某些特殊情况下,您可以通过使用*分摊加载*进一步提高性能,当使用此方法时,输入由多个 Vertica 节点进行解析。

3.1 - UDParser 类

如果需要解析 COPY 语句的原生解析器无法处理的数据格式,您可以将 UDParser 类子类化。

在解析器执行期间,Vertica 会始终调用三个方法:setup()process()destroy()。它还可能调用 getRejectedRecord()

UDParser 构造函数

UDParser 类可执行对所有子类来说必不可少的初始化,包括初始化解析器使用的 StreamWriter 对象。因此,构造函数必须调用 super()

UDParser 方法

您的 UDParser 子类必须覆盖 process()processWithMetadata()

  • process() 将原始输入流作为一个大文件读取。如果有任何错误或故障,整个加载将失败。您可以实施具有源或筛选器(可实施 processWithMetadata())的 process(),但这可能会导致解析错误。
    您可以在上游源或筛选器实施 processWithMetadata() 时实施 process(),但可能会导致解析错误。

  • processWithMetadata() 当数据源具有以某种结构化格式(与数据有效负载分开)提供的关于记录边界的元数据时很有用。使用此接口,源除了发出数据之外,还会为每个记录发出记录长度。

    通过在每个阶段实施 processWithMetadata() 而不是 process(),您可以在整个加载堆栈中保留此记录长度元数据,从而实施更有效的解析,以在每个消息的基础上而不是每个文件或每个源的基础上从错误中恢复。 KafkaSource 和 Kafka 解析器(KafkaAvroParserKafkaJSONParserKafkaParser)会在各条 Kafka 消息损坏时使用这种机制为拒绝每条 Kafka 消息提供支持。

此外,您必须覆盖 getRejectedRecord() 以返回有关被拒绝记录的信息。

或者,您可以覆盖其他 UDParser 类方法。

解析器执行

以下各节详细说明了每次调用用户定义的解析器时的执行序列。以下示例覆盖 process() 方法。

设置

COPY 在第一次调用 process() 之前调用了 setup()。使用 setup() 可执行任何初始设置任务,以使解析器能够解析数据。此设置包括从类上下文结构检索参数,或初始化数据结构以在过滤期间使用。在第一次调用 process() 方法之前,Vertica 会先调用此方法。您的对象可能在使用期间已损坏并重新创建,因此请确保您的对象可以重新启动。

解析

COPY 将在查询执行期间重复调用 process()。Vertica 会向此方法传递数据缓冲区以解析为列和行以及由 InputState 定义的以下输入状态之一:

  • OK:当前在流的开头或中间

  • END_OF_FILE:无更多数据可用。

  • END_OF_CHUNK:当前数据在记录边界处结束且解析器应在返回之前用完所有数据。此输入状态仅在使用块分割器时出现。

  • START_OF_PORTION:输入的起始位置不是源的开头。解析器应查找第一个记录结束标记。此输入状态仅在使用分摊加载时出现。您可以使用 getPortion() 方法访问相应部分的偏移量和大小。

  • END_OF_PORTION:源已到达其部分的结尾。解析器应完成处理它开始的最后一条记录且不再前进。此输入状态仅在使用分摊加载时出现。

解析器必须拒绝其无法解析的任何数据,以便 Vertica 可以报告拒绝并将拒绝的数据写入到文件中。

process() 方法必须从输入缓冲区解析尽可能多的数据。输入缓冲区可能不在行边界结束。因此,该方法可能不得不在输入行的中间停止解析并请求更多数据。如果源文件包含 null 字节,则输入可以包含 null 字节,而且不会自动以 null 终止。

解析器具有关联的 StreamWriter 对象,数据写入实际上由此对象执行。当解析器提取列值后,它会对 StreamWriter 使用特定于类型的方法之一以将该值写入到输出流。有关这些方法的详细信息,请参阅写入数据

process() 的一次调用可能会写入多行数据。当解析器结束对数据行的处理后,它必须对 StreamWriter 调用 next() 以使输出流前进到新行。(通常,解析器会完成处理一行,因为它遇到了行尾标记。)

process() 方法到达缓冲区结尾时,它会通过返回由 StreamState 定义的以下值之一来告知 Vertica 其当前状态:

  • INPUT_NEEDED:解析器已到达缓冲区的结尾且需要获取更多可供解析的数据。

  • DONE:解析器已到达输入数据流的结尾。

  • REJECT:解析器已拒绝所读取的最后一行数据(请参阅拒绝行)。

分解

COPY 将在最后一次调用 process() 之后调用 destroy()。此方法可释放由 setup()process() 方法预留的任何资源。

process() 方法指示它已完成对数据源的解析之后,Vertica 将调用此方法。但是,有时可能会剩余尚未处理的数据源。在这种情况下,Vertica 可能会在稍后再次对该对象调用 setup(),并让此方法解析新的数据流中的数据。因此,请编写 destroy() 方法,以使 UDParser 子类的实例处于某种状态,从而可以安全地再次调用 setup()

报告拒绝

如果 process() 拒绝某行,Vertica 将调用 getRejectedRecord() 以报告该拒绝。通常,此方法将返回 RejectedRecord 类的实例和拒绝的行的详细信息。

写入数据

解析器具有关联的 StreamWriter 对象,您可以通过调用 getStreamWriter() 来访问此对象。在实施 process() 时,对 StreamWriter 对象使用 setType() 方法可将行中的值写入到特定的列索引。请验证写入的数据类型是否与架构所需的数据类型匹配。

以下示例显示了如何将类型为 long 的值写入到当前行中的第四列(索引 3):

StreamWriter writer = getStreamWriter();
...
writer.setLongValue(3, 98.6);

StreamWriter 为所有基本类型提供多种方法,例如 setBooleanValue()setStringValue() 等。有关 StreamWriter 方法的完整列表,包括使用基元类型的选项或将条目显式设置为 null 的选项,请参阅 API 文档。

拒绝行

如果解析器发现无法解析的数据,它应按照以下过程拒绝该行:

  1. 保存有关拒绝的行数据的详细信息和拒绝原因。这些信息片段可以直接存储在 RejectedRecord 对象中,或者也可以在需要使用之前将其存储在 UDParser 子类上的字段中。

  2. 通过更新 input.offset 来更新该行在输入缓冲区中的位置,以便可以从下一行继续解析。

  3. 通过返回值 StreamState.REJECT 来指示它已拒绝某个行。

  4. 返回 RejectedRecord 类的实例和有关拒绝的行的详细信息。

拆分大型负载

Vertica 提供了两种拆分大型负载的方法。 分摊加载 允许您在多个数据库节点之间分配负载。 协作解析 (仅限 C++)允许您在一个节点上的多个线程之间分配负载。

API

UDParser API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface, SizedColumnTypes &returnType);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input, InputState input_state)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &input,
                            LengthBuffer &input_lengths, InputState input_state)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface, SizedColumnTypes &returnType);

virtual RejectedRecord getRejectedRecord();

UDParser API 提供了以下通过子类扩展的方法:

public void setup(ServerInterface srvInterface, SizedColumnTypes returnType)
    throws UdfException;

public abstract StreamState process(ServerInterface srvInterface,
                DataBuffer input, InputState input_state)
    throws UdfException, DestroyInvocation;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface, SizedColumnTypes returnType)
    throws UdfException;

public RejectedRecord getRejectedRecord() throws UdfException;

UDParser 使用 StreamWriter 写入其输出。 StreamWriter 为所有基本类型提供多种方法,例如 setBooleanValue()setStringValue() 等。在 Java API 中,该类还提供可自动设置数据类型的 setValue() 方法。

前述方法可写入单个列值。 StreamWriter 还提供用于从映射写入完整行的方法。setRowFromMap() 方法使用列名称和值的映射,并将所有值写入到相应的列。此方法不定义新列,而是仅将值写入到现有列。JsonParser 示例使用此方法写入任意 JSON 输入。(请参阅Java 示例:JSON 解析器。)

setRowsFromMap() 还会使用提供的完整映射填充 Flex 表(请参阅 Flex 表)的任何 VMap ('raw') 列。在大多数情况下,setRowsFromMap() 是用于填充弹性表的适当方法。但是,您也可以使用 setVMap()(类似于其他 setValue() 方法)在指定的列中生成 VMap 值。

setRowFromMap() 方法可自动强制将输入值转换为使用关联的 TypeCoercion 为这些列定义的类型。在大多数情况下,使用默认实施 (StandardTypeCoercion) 都是合适的。

TypeCoercion 使用策略来控制其行为。例如,FAIL_INVALID_INPUT_VALUE 策略指示将无效输入视为错误,而非使用 null 值。系统会捕获错误,并将其作为拒绝进行处理(请参阅用户定义的解析器中的“拒绝行”)。策略还能控制是否截断太长的输入。对解析器的 TypeCoercion 使用 setPolicy() 方法可设置策略。有关支持的值,请参阅 API 文档。

除了设置这些策略之外,您可能还需要自定义类型强制转换。要执行此操作,请子类化提供的 TypeCoercion 实施之一,并覆盖 asType() 方法。如果解析器将读取来自第三方库的对象,则有必要进行此自定义。例如,用于处理地理坐标的解析器可以覆盖 asLong,以将诸如“40.4397N”等输入转换为数字。有关实施的列表,请参阅 Vertica API 文档。

UDParser API 提供了以下通过子类扩展的方法:


class PyUDParser(vertica_sdk.UDSource):
    def __init__(self):
        pass
    def setup(srvInterface, returnType):
        pass
    def process(self, srvInterface, inputbuffer, inputstate, streamwriter):
        # User implement process function.
        # User reads data from inputbuffer and parse the data.
        # Rows are emitted via the streamwriter argument
        return StreamState.DONE

在 Python 中,process() 方法需要输入缓冲区和输出缓冲区(请参阅 InputBuffer API 和 OutputBuffer API)。输入缓冲区表示您要解析的信息源。输出缓冲区会将筛选后的信息传递给 Vertica。

如果筛选器拒绝记录,请使用方法 REJECT() 来识别被拒绝的数据和拒绝原因。

3.2 - UDChunker 类

您可以子类化 UDChunker 类以允许解析器支持 协作解析。此类仅在 C++ API 中可用。

从根本上说,UDChunker 是一个非常简单的解析器。与 UDParser 一样,它包含以下三个方法:setup()process()destroy()。您必须覆盖 process(),而且可以覆盖其他方法。此类包含一个额外的方法 alignPortion(),您必须在要为 UDChunker 启用 分摊加载时实施该方法。

设置和分解

UDParser 一样,您可以为块分割器定义初始化和清理代码。Vertica 会在第一次调用 process() 之前先调用 setup(),并在最后一次调用 process() 之后才调用 destroy()。您的对象可能会在多个加载源之间重用,因此请确保 setup() 会完全初始化所有字段。

分块

Vertica 会调用 process() 以将输入拆分为可以独立解析的块。该方法会采用输入缓冲区和输入状态指示器:

  • OK:输入缓冲区从流的开头或中间位置开始。

  • END_OF_FILE:无更多数据可用。

  • END_OF_PORTION:源已到达其部分的结尾。此状态仅在使用分摊加载时出现。

如果输入状态为 END_OF_FILE,则块分割器应将 input.offset 标记设置为 input.size 并返回 DONE。返回 INPUT_NEEDED 是错误的。

如果输入状态为 OK,则块分割器应从输入缓冲区读取数据并找到记录边界。如果它找到至少一条记录的结尾,它应将 input.offset 标记与缓冲区中最后一条记录结尾之后的字节对齐并返回 CHUNK_ALIGNED。例如,如果输入是“abc~def”并且“~”是记录终止符,则此方法应将 input.offset 设置为 4,即“d”的位置。如果 process() 在没有找到记录边界的情况下到达输入的结尾,则应返回 INPUT_NEEDED

您可以将输入拆分成更小的块,但使用输入中的所有可用记录可以提高性能。例如,块分割器可以从输入的结尾向后扫描以找到记录终止符(它可能是输入中诸多记录的最后一条),并将其作为一个块全部返回,而不扫描剩余输入。

如果输入状态为 END_OF_PORTION,则块分割器的行为应与输入状态 OK 的行为相同,只不过它还应设置一个标记。再次调用时,它应在下一部分中找到第一条记录,并将块与该记录对齐。

输入数据可能包含 null 字节(如果源文件包含这种字节)。输入实参不会自动以 null 值终止。

process() 方法不得被无限期阻止。如果此方法在很长一段时间内无法继续执行,它应返回 KEEP_GOING。未能返回 KEEP_GOING 将导致多种后果,例如导致用户无法取消查询。

有关使用分块的 process() 方法的示例,请参阅 C++ 示例:分隔解析器和块分割器

对齐部分

如果块分割器支持分摊加载,请实施 alignPortion() 方法。在调用 process() 之前,Vertica 会调用此方法一次或多次,以将输入偏移量与该部分中第一个完整块的开头对齐。该方法会采用输入缓冲区和输入状态指示器:

  • START_OF_PORTION:缓冲区的开头对应于该部分的开头。可以使用 getPortion() 方法来访问该部分的偏移量和大小。

  • OK:输入缓冲区在相应部分的中间。

  • END_OF_PORTION:缓冲区的结尾对应于相应部分的结尾或超出相应部分的结尾。

  • END_OF_FILE:无更多数据可用。

该方法应从缓冲区的开头扫描到第一个完整记录的开头。它应该将 input.offset 设置为此位置并返回以下值之一:

  • DONE(如果找到块)。 input.offset 为块的第一个字节。

  • INPUT_NEEDED(如果输入缓冲区不包含任何块的开头)。从 END_OF_FILE 的输入状态返回此值是错误的。

  • REJECT(如果相应部分而非缓冲区不包含任何块的开头)。

API

UDChunker API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface,
            SizedColumnTypes &returnType);

virtual StreamState alignPortion(ServerInterface &srvInterface,
            DataBuffer &input, InputState state);

virtual StreamState process(ServerInterface &srvInterface,
            DataBuffer &input, InputState input_state)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface,
            SizedColumnTypes &returnType);

3.3 - ParserFactory 类

如果编写解析器,您还必须编写用于生成解析器实例的工厂。为此,请对 ParserFactory 类设置子类。

解析器工厂是单例。您的子类必须为无状态子类,没有包含数据的字段。子类还不得修改任何全局变量。

ParserFactory 类定义了以下方法。您的子类必须覆盖 prepare() 方法。可以覆盖其他方法。

设置

Vertica 将在启动程序节点上调用 plan() 一次,以执行下列任务:

  • 检查已从 COPY 语句中的函数调用传递的任何参数和错误消息(如果出现任何问题)。您通过从传递至 plan() 方法的 ServerInterface 的实例中获得 ParamReader 对象读取参数。

  • 存储各个主机解析数据所需的任何信息。例如,可以将参数存储在通过 planCtxt 参数传入的 PlanContext 实例中。plan() 方法仅在启动程序节点上运行,而 prepareUDSources() 方法则在每个读取数据源的主机上运行。因此,该对象是它们之间的唯一通信方式。

    您通过从 getWriter() 方法中获取 ParamWriter 对象在 PlanContext 中存储数据。然后,通过对 ParamWriter 调用方法(比如 setString)写入参数。

创建解析器

Vertica 将在每个节点上调用 prepare(),以使用由 plan() 方法存储的数据来创建并初始化解析器。

定义参数

实施 getParameterTypes() 可定义解析器所使用的参数的名称和类型。Vertica 使用此信息对调用人员发出有关未知或缺失参数的警告。Vertica 会忽略未知参数并为缺失参数使用默认值。当您需要为函数定义类型和参数时,您不需要覆盖此方法。

定义解析器输出

实施 getParserReturnType() 可定义解析器输出的表列的数据类型。如果适用,getParserReturnType() 还可以定义数据类型的大小、精度和小数位数。通常,此方法从 argTypeperColumnParamReader 参数读取输出表的数据类型,并验证是否能够输出相应的数据类型。如果 getParserReturnType() 已准备好输出数据类型,则它会对 returnType 参数中传递的 SizedColumnTypes 对象调用方法。除了输出列的数据类型之外,方法还应指定有关列的数据类型的任何附加信息:

  • 对于二进制和字符串数据类型(例如,CHAR、VARCHAR 和 LONG VARBINARY),请指定其最大长度。

  • 对于 NUMERIC 类型,请指定其精度和小数位数。

  • 对于 Time/Timestamp 类型(无论是否带有时区),请指定其精度(-1 表示未指定)。

  • 对于 ARRAY 类型,请指定元素的最大数量。

  • 对于所有其他类型,无需指定长度或精度。

支持协作解析

要支持协作解析,请实施 prepareChunker() 并返回 UDChunker 子类的实例。如果 isChunkerApportionable() 返回 true,则此方法返回 null 是错误的。

目前仅在 C++ API 中支持协作解析。

支持分摊加载

要支持分摊加载,解析器、块分割器或这两者都必须支持分摊。要指示解析器可以分摊加载,请实施 isParserApportionable() 并返回 true。要指示块分割器可以分摊加载,请实施 isChunkerApportionable() 并返回 true

isChunkerApportionable() 方法会将 ServerInterface 作为实参,因此您可以访问 COPY 语句中提供的参数。例如,如果用户可以指定记录分隔符,您可能需要此类信息。当且仅当工厂可以为此输入创建块分割器时,此方法将返回 true

API

ParserFactory API 提供了以下通过子类扩展的方法:

virtual void plan(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader, PlanContext &planCtxt);

virtual UDParser * prepare(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt, const SizedColumnTypes &returnType)=0;

virtual void getParameterType(ServerInterface &srvInterface, SizedColumnTypes &parameterTypes);

virtual void getParserReturnType(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt, const SizedColumnTypes &argTypes,
            SizedColumnTypes &returnType);

virtual bool isParserApportionable();

// C++ API only:
virtual bool isChunkerApportionable(ServerInterface &srvInterface);

virtual UDChunker * prepareChunker(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt, const SizedColumnTypes &returnType);

如果要使用 分摊加载 将单个输入拆分成多个加载流,请实施 isParserApportionable() 和/或 isChunkerApportionable() 并返回 true。即使这些方法返回 true,也不能保证 Vertica 分摊加载。然而,如果这两个方法均返回 false,则表示解析器不会尝试这么做。

如果要使用 协作解析,请实施 prepareChunker() 并返回 UDChunker 子类的实例。仅 C++ API 支持协作解析。

Vertica 将为非隔离功能调用 prepareChunker() 方法。在隔离模式下使用函数时,此方法不可用。

如果您希望块分割器可用于分摊加载,请实施 isChunkerApportionable() 并返回 true

创建 ParserFactory 之后,您必须将其注册到 RegisterFactory 宏。

ParserFactory API 提供了以下通过子类扩展的方法:

public void plan(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader, PlanContext planCtxt)
    throws UdfException;

public abstract UDParser prepare(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt, SizedColumnTypes returnType)
    throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

public void getParserReturnType(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt, SizedColumnTypes argTypes, SizedColumnTypes returnType)
    throws UdfException;

ParserFactory API 提供了以下通过子类扩展的方法:

class PyParserFactory(vertica_sdk.SourceFactory):
    def __init__(self):
        pass
    def plan(self):
        pass
    def prepareUDSources(self, srvInterface):
        # User implement the function to create PyUDParser.
        pass

3.4 - C++ 示例: BasicIntegerParser

BasicIntegerParser 示例解析由非数字字符分隔的整数字符串。有关使用持续加载的此解析器的版本,请参阅 C++ 示例: ContinuousIntegerParser

加载和使用示例

按如下所示加载并使用 BasicIntegerParser 示例。

=> CREATE LIBRARY BasicIntegerParserLib AS '/home/dbadmin/BIP.so';

=> CREATE PARSER BasicIntegerParser AS
LANGUAGE 'C++' NAME 'BasicIntegerParserFactory' LIBRARY BasicIntegerParserLib;

=> CREATE TABLE t (i integer);

=> COPY t FROM stdin WITH PARSER BasicIntegerParser();
0
1
2
3
4
5
\.

实施

BasicIntegerParser 类仅实施 API 中的 process() 方法。(它还实施了一个用于类型转换的 helper 方法。)此方法处理每一行输入,查找每一行的数字。当它前进到新行时,它会移动 input.offset 标记并检查输入状态。然后它写入输出。

    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input,
                InputState input_state) {
        // WARNING: This implementation is not trying for efficiency.
        // It is trying for simplicity, for demonstration purposes.

        size_t start = input.offset;
        const size_t end = input.size;

        do {
            bool found_newline = false;
            size_t numEnd = start;
            for (; numEnd < end; numEnd++) {
                if (input.buf[numEnd] < '0' || input.buf[numEnd] > '9') {
                    found_newline = true;
                    break;
                }
            }

            if (!found_newline) {
                input.offset = start;
                if (input_state == END_OF_FILE) {
                    // If we're at end-of-file,
                    // emit the last integer (if any) and return DONE.
                    if (start != end) {
                        writer->setInt(0, strToInt(input.buf + start, input.buf + numEnd));
                        writer->next();
                    }
                    return DONE;
                } else {
                    // Otherwise, we need more data.
                    return INPUT_NEEDED;
                }
            }

            writer->setInt(0, strToInt(input.buf + start, input.buf + numEnd));
            writer->next();

            start = numEnd + 1;
        } while (true);
    }
};

在工厂中,plan() 方法是无操作的;没有要检查的参数。prepare() 方法使用 SDK 提供的宏实例化解析器:

    virtual UDParser* prepare(ServerInterface &srvInterface,
            PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt,
            const SizedColumnTypes &returnType) {

        return vt_createFuncObject<BasicIntegerParser>(srvInterface.allocator);
    }

getParserReturnType() 方法声明了单个输出:

    virtual void getParserReturnType(ServerInterface &srvInterface,
            PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt,
            const SizedColumnTypes &argTypes,
            SizedColumnTypes &returnType) {
        // We only and always have a single integer column
        returnType.addInt(argTypes.getColumnName(0));
    }

对于所有用 C++ 编写的 UDx,该示例以注册其工厂结束:

RegisterFactory(BasicIntegerParserFactory);

3.5 - C++ 示例: ContinuousIntegerParser

ContinuousIntegerParser 示例是 BasicIntegerParser 的变体。这两个示例都从输入字符串中解析整数。 ContinuousIntegerParser 使用 连续加载 读取数据。

加载和使用示例

按如下所示加载 ContinuousIntegerParser 示例。

=> CREATE LIBRARY ContinuousIntegerParserLib AS '/home/dbadmin/CIP.so';

=> CREATE PARSER ContinuousIntegerParser AS
LANGUAGE 'C++' NAME 'ContinuousIntegerParserFactory'
LIBRARY ContinuousIntegerParserLib;

以使用 BasicIntegerParser 的同样方式使用它。请参阅C++ 示例: BasicIntegerParser

实施

ContinuousIntegerParserContinuousUDParser 的子类。ContinuousUDParser 的子类将处理逻辑放在 run() 方法中。

    virtual void run() {

        // This parser assumes a single-column input, and
        // a stream of ASCII integers split by non-numeric characters.
        size_t pos = 0;
        size_t reserved = cr.reserve(pos+1);
        while (!cr.isEof() || reserved == pos + 1) {
            while (reserved == pos + 1 && isdigit(*ptr(pos))) {
                pos++;
                reserved = cr.reserve(pos + 1);
            }

            std::string st(ptr(), pos);
            writer->setInt(0, strToInt(st));
            writer->next();

            while (reserved == pos + 1 && !isdigit(*ptr(pos))) {
                pos++;
                reserved = cr.reserve(pos + 1);
            }
            cr.seek(pos);
            pos = 0;
            reserved = cr.reserve(pos + 1);
        }
    }
};

有关 ContinuousUDParser 的更复杂示例,请参阅示例中的 ExampleDelimitedParser。(请参阅下载并运行 UDx 示例代码。) ExampleDelimitedParser 使用块分割器;请参阅 C++ 示例:分隔解析器和块分割器

3.6 - Java 示例:数字文本

NumericTextParser 示例可解析以单词而非数字表示的整数值(例如,"one two three" 代表一百二十三)。解析器将执行下列操作:

  • 接受单个参数,以设置用来分隔数据行中各列的字符。分隔符默认设置为管道 (|) 字符。

  • 忽略额外空格和用于表示数字的单词的首字母大写。

  • 使用以下单词标识数字:zero、one、two、three、four、five、six、seven、eight 和 nine。

  • 假设表示整数的单词至少用一个空格分隔。

  • 拒绝无法完全解析为整数的任何数据行。

  • 在输出表包含非整数列时生成错误。

加载和使用示例

按如下所示加载并使用解析器:

=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaLib.jar' LANGUAGE 'JAVA';
CREATE LIBRARY

=> CREATE PARSER NumericTextParser AS LANGUAGE 'java'
->    NAME 'com.myCompany.UDParser.NumericTextParserFactory'
->    LIBRARY JavaLib;
CREATE PARSER FUNCTION
=> CREATE TABLE t (i INTEGER);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One
>> Two
>> One Two Three
>> \.
=> SELECT * FROM t ORDER BY i;
  i
-----
   1
   2
 123
(3 rows)

=> DROP TABLE t;
DROP TABLE
=> -- Parse multi-column input
=> CREATE TABLE t (i INTEGER, j INTEGER);
CREATE TABLE
=> COPY t FROM stdin WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One | Two
>> Two | Three
>> One Two Three | four Five Six
>> \.
=> SELECT * FROM t ORDER BY i;
  i  |  j
-----+-----
   1 |   2
   2 |   3
 123 | 456
(3 rows)

=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> -- Use alternate separator character
=> COPY t FROM STDIN WITH PARSER NumericTextParser(separator='*');
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> Five * Six
>> seven * eight
>> nine * one zero
>> \.
=> SELECT * FROM t ORDER BY i;
 i | j
---+----
 5 |  6
 7 |  8
 9 | 10
(3 rows)

=> TRUNCATE TABLE t;
TRUNCATE TABLE

=> -- Rows containing data that does not parse into digits is rejected.
=> DROP TABLE t;
DROP TABLE
=> CREATE TABLE t (i INTEGER);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One Zero Zero
>> Two Zero Zero
>> Three Zed Zed
>> Four Zero Zero
>> Five Zed Zed
>> \.
SELECT * FROM t ORDER BY i;
  i
-----
 100
 200
 400
(3 rows)

=> -- Generate an error by trying to copy into a table with a non-integer column
=> DROP TABLE t;
DROP TABLE
=> CREATE TABLE t (i INTEGER, j VARCHAR);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
vsql:UDParse.sql:94: ERROR 3399:  Failure in UDx RPC call
InvokeGetReturnTypeParser(): Error in User Defined Object [NumericTextParser],
error code: 0
com.vertica.sdk.UdfException: Column 2 of output table is not an Int
        at com.myCompany.UDParser.NumericTextParserFactory.getParserReturnType
        (NumericTextParserFactory.java:70)
        at com.vertica.udxfence.UDxExecContext.getReturnTypeParser(
        UDxExecContext.java:1464)
        at com.vertica.udxfence.UDxExecContext.getReturnTypeParser(
        UDxExecContext.java:768)
        at com.vertica.udxfence.UDxExecContext.run(UDxExecContext.java:236)
        at java.lang.Thread.run(Thread.java:662)

解析器实施

以下代码可实施解析器。

package com.myCompany.UDParser;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.DestroyInvocation;
import com.vertica.sdk.RejectedRecord;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.InputState;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.StreamWriter;
import com.vertica.sdk.UDParser;
import com.vertica.sdk.UdfException;

public class NumericTextParser extends UDParser {

    private String separator; // Holds column separator character

    // List of strings that we accept as digits.
    private List<String> numbers = Arrays.asList("zero", "one",
            "two", "three", "four", "five", "six", "seven",
            "eight", "nine");

    // Hold information about the last rejected row.
    private String rejectedReason;
    private String rejectedRow;

    // Constructor gets the separator character from the Factory's prepare()
    // method.
    public NumericTextParser(String sepparam) {
        super();
        this.separator = sepparam;
    }

    // Called to perform the actual work of parsing. Gets a buffer of bytes
    // to turn into tuples.
    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer input,
            InputState input_state) throws UdfException, DestroyInvocation {

        int i=input.offset; // Current position in the input buffer
        // Flag to indicate whether we just found the end of a row.
        boolean lastCharNewline = false;
        // Buffer to hold the row of data being read.
        StringBuffer line = new StringBuffer();

        //Continue reading until end of buffer.
        for(; i < input.buf.length; i++){
            // Loop through input until we find a linebreak: marks end of row
            char inchar = (char) input.buf[i];
            // Note that this isn't a robust way to find rows. It should
            // accept a user-defined row separator. Also, the following
            // assumes ASCII line break metheods, which isn't a good idea
            // in the UTF world. But it is good enough for this simple example.
            if (inchar != '\n' && inchar != '\r') {
                // Keep adding to a line buffer until a full row of data is read
                line.append(inchar);
                lastCharNewline = false; // Last character not a new line
            } else {
                // Found a line break. Process the row.
                lastCharNewline = true; // indicate we got a complete row
                // Update the position in the input buffer. This is updated
                // whether the row is successfully processed or not.
                input.offset = i+1;
                // Call procesRow to extract values and write tuples to the
                // output. Returns false if there was an error.
                if (!processRow(line)) {
                    // Processing row failed. Save bad row to rejectedRow field
                    // and return to caller indicating a rejected row.
                    rejectedRow = line.toString();
                    // Update position where we processed the data.
                    return StreamState.REJECT;
                }
                line.delete(0, line.length()); // clear row buffer
            }
        }

        // At this point, process() has finished processing the input buffer.
        // There are two possibilities: need to get more data
        // from the input stream to finish processing, or there is
        // no more data to process. If at the end of the input stream and
        // the row was not terminated by a linefeed, it may need
        // to process the last row.

        if (input_state == InputState.END_OF_FILE && lastCharNewline) {
            // End of input and it ended on a newline. Nothing more to do
            return StreamState.DONE;
        } else if (input_state == InputState.END_OF_FILE && !lastCharNewline) {
            // At end of input stream but didn't get a final newline. Need to
            // process the final row that was read in, then exit for good.
            if (line.length() == 0) {
                // Nothing to process. Done parsing.
                return StreamState.DONE;
            }
            // Need to parse the last row, not terminated by a linefeed. This
            // can occur if the file being read didn't have a final line break.
            if (processRow(line)) {
                return StreamState.DONE;
            } else {
                // Processing last row failed. Save bad row to rejectedRow field
                // and return to caller indicating a rejected row.
                rejectedRow = line.toString();
                // Tell Vertica the entire buffer was processed so it won't
                // call again to have the line processed.
                input.offset = input.buf.length;
                return StreamState.REJECT;
            }
        } else {
            // Stream is not fully read, so tell Vertica to send more. If
            // process() didn't get a complete row before it hit the end of the
            // input buffer, it will end up re-processing that segment again
            // when more data is added to the buffer.
            return StreamState.INPUT_NEEDED;
        }
    }

    // Breaks a row into columns, then parses the content of the
    // columns. Returns false if there was an error parsing the
    // row, in which case it sets the rejected row to the input
    // line. Returns true if the row was successfully read.
    private boolean processRow(StringBuffer line)
                                throws UdfException, DestroyInvocation {
        String[] columns = line.toString().split(Pattern.quote(separator));
        // Loop through the columns, decoding their contents
        for (int col = 0; col < columns.length; col++) {
            // Call decodeColumn to extract value from this column
            Integer colval = decodeColumn(columns[col]);
            if (colval == null) {
                // Could not parse one of the columns. Indicate row should
                // be rejected.
                return false;
            }
            // Column parsed OK. Write it to the output. writer is a field
            // provided by the parent class. Since this parser only accepts
            // integers, there is no need to verify that data type of the parsed
            // data matches the data type of the column being written. In your
            // UDParsers, you may want to perform this verification.
            writer.setLong(col,colval);
        }
        // Done with the row of data. Advance output to next row.

        // Note that this example does not verify that all of the output columns
        // have values assigned to them. If there are missing values at the
        // end of a row, they get automatically get assigned a default value
        // (0 for integers). This isn't a robust solution. Your UDParser
        // should perform checks here to handle this situation and set values
        // (such as null) when appropriate.
        writer.next();
        return true; // Successfully processed the row.
    }

    // Gets a string with text numerals, i.e. "One Two Five Seven" and turns
    // it into an integer value, i.e. 1257. Returns null if the string could not
    // be parsed completely into numbers.
    private Integer decodeColumn(String text) {
        int value = 0; // Hold the value being parsed.

        // Split string into individual words. Eat extra spaces.
        String[] words = text.toLowerCase().trim().split("\\s+");

        // Loop through the words, matching them against the list of
        // digit strings.
        for (int i = 0; i < words.length; i++) {
            if (numbers.contains(words[i])) {
                // Matched a digit. Add the it to the value.
                int digit = numbers.indexOf(words[i]);
                value = (value * 10) + digit;
            } else {
                // The string didn't match one of the accepted string values
                // for digits. Need to reject the row. Set the rejected
                // reason string here so it can be incorporated into the
                // rejected reason object.
                //
                // Note that this example does not handle null column values.
                // In most cases, you want to differentiate between an
                // unparseable column value and a missing piece of input
                // data. This example just rejects the row if there is a missing
                // column value.
                rejectedReason = String.format(
                        "Could not parse '%s' into a digit",words[i]);
                return null;
            }
        }
        return value;
    }

    // Vertica calls this method if the parser rejected a row of data
    // to find out what went wrong and add to the proper logs. Just gathers
    // information stored in fields and returns it in an object.
    @Override
    public RejectedRecord getRejectedRecord() throws UdfException {
        return new RejectedRecord(rejectedReason,rejectedRow.toCharArray(),
                rejectedRow.length(), "\n");
    }
}

ParserFactory 实施

以下代码可实施解析器工厂。

NumericTextParser 接受名为 separator 的单个可选参数。您可以在 getParameterType() 方法中定义此参数,而 plan() 方法可存储此参数的值。 NumericTextParser 仅输出整数值。因此,如果输出表包含数据类型是非整数的列,getParserReturnType() 方法将抛出异常。

package com.myCompany.UDParser;

import java.util.regex.Pattern;

import com.vertica.sdk.ParamReader;
import com.vertica.sdk.ParamWriter;
import com.vertica.sdk.ParserFactory;
import com.vertica.sdk.PerColumnParamReader;
import com.vertica.sdk.PlanContext;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.UDParser;
import com.vertica.sdk.UdfException;
import com.vertica.sdk.VerticaType;

public class NumericTextParserFactory extends ParserFactory {

    // Called once on the initiator host to check the parameters and set up the
    // context data that hosts performing processing will need later.
    @Override
    public void plan(ServerInterface srvInterface,
                PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt) {

        String separator = "|"; // assume separator is pipe character

        // See if a parameter was given for column separator
        ParamReader args = srvInterface.getParamReader();
        if (args.containsParameter("separator")) {
            separator = args.getString("separator");
            if (separator.length() > 1) {
                throw new UdfException(0,
                        "Separator parameter must be a single character");
            }
            if (Pattern.quote(separator).matches("[a-zA-Z]")) {
                throw new UdfException(0,
                        "Separator parameter cannot be a letter");
            }
        }

        // Save separator character in the Plan Data
        ParamWriter context = planCtxt.getWriter();
        context.setString("separator", separator);
    }

    // Define the data types of the output table that the parser will return.
    // Mainly, this just ensures that all of the columns in the table which
    // is the target of the data load are integer.
    @Override
    public void getParserReturnType(ServerInterface srvInterface,
                PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt,
                SizedColumnTypes argTypes,
                SizedColumnTypes returnType) {

        // Get access to the output table's columns
        for (int i = 0; i < argTypes.getColumnCount(); i++ ) {
            if (argTypes.getColumnType(i).isInt()) {
                // Column is integer... add it to the output
                 returnType.addInt(argTypes.getColumnName(i));
            } else {
                // Column isn't an int, so throw an exception.
                // Technically, not necessary since the
                // UDx framework will automatically error out when it sees a
                // Discrepancy between the type in the target table and the
                // types declared by this method. Throwing this exception will
                // provide a clearer error message to the user.
                String message = String.format(
                    "Column %d of output table is not an Int", i + 1);
                throw new UdfException(0, message);
            }
        }
    }

    // Instantiate the UDParser subclass named NumericTextParser. Passes the
    // separator characetr as a paramter to the constructor.
    @Override
    public UDParser prepare(ServerInterface srvInterface,
            PerColumnParamReader perColumnParamReader, PlanContext planCtxt,
            SizedColumnTypes returnType) throws UdfException {
        // Get the separator character from the context
        String separator = planCtxt.getReader().getString("separator");
        return new NumericTextParser(separator);
    }

    // Describe the parameters accepted by this parser.
    @Override
    public void getParameterType(ServerInterface srvInterface,
             SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(1, "separator");
    }
}

3.7 - Java 示例:JSON 解析器

JSON 解析器会使用 JSON 对象流。每个对象都必须具有正确格式,并且必须位于输入中的一行上。可以使用换行符分隔对象。此解析器使用字段名称作为映射中的键,而这些键将成为表中的列名称。可以在 /opt/vertica/packages/flextable/examples 中找到此示例的代码。此目录还包含一个示例数据文件。

此示例使用 setRowFromMap() 方法写入数据。

加载和使用示例

使用第三方库 (gson-2.2.4.jar) 加载库并定义 JSON 解析器,如下所示。有关下载 URL,请参阅 JsonParser.java 中的注释:

=> CREATE LIBRARY json
-> AS '/opt/vertica/packages/flextable/examples/java/output/json.jar'
-> DEPENDS '/opt/vertica/bin/gson-2.2.4.jar' language 'java';
CREATE LIBRARY

=> CREATE PARSER JsonParser AS LANGUAGE 'java'
-> NAME 'com.vertica.flex.JsonParserFactory' LIBRARY json;
CREATE PARSER FUNCTION

您现在可以定义一个表,然后使用 JSON 解析器将数据加载到其中,如下所示:

=> CREATE TABLE mountains(name varchar(64), type varchar(32), height integer);
CREATE TABLE

=> COPY mountains FROM '/opt/vertica/packages/flextable/examples/mountains.json'
-> WITH PARSER JsonParser();
-[ RECORD 1 ]--
Rows Loaded | 2

=> SELECT * from mountains;
-[ RECORD 1 ]--------
name   | Everest
type   | mountain
height | 29029
-[ RECORD 2 ]--------
name   | Mt St Helens
type   | volcano
height |

该数据文件包含一个未加载的值 (hike_safety),因为表定义不包括该列。该数据文件遵循以下格式:

{ "name": "Everest", "type":"mountain", "height": 29029, "hike_safety": 34.1  }
{ "name": "Mt St Helens", "type": "volcano", "hike_safety": 15.4 }

实施

以下代码显示了 JsonParser.java 中的 process() 方法。解析器会尝试将输入读取到一个 Map. 中。如果读取成功,JSON 解析器将调用 setRowFromMap()

    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer input,
            InputState inputState) throws UdfException, DestroyInvocation {
        clearReject();
        StreamWriter output = getStreamWriter();

        while (input.offset < input.buf.length) {
            ByteBuffer lineBytes = consumeNextLine(input, inputState);

            if (lineBytes == null) {
                return StreamState.INPUT_NEEDED;
            }

            String lineString = StringUtils.newString(lineBytes);

            try {
                Map map = gson.fromJson(lineString, parseType);

                if (map == null) {
                    continue;
                }

                output.setRowFromMap(map);
                // No overrides needed, so just call next() here.
         output.next();
            } catch (Exception ex) {
                setReject(lineString, ex);
                return StreamState.REJECT;
            }
        }

JsonParserFactory.java 工厂会将解析器实例化并在 prepare() 方法中返回该解析器。您不需要执行其他设置。

3.8 - C++ 示例:分隔解析器和块分割器

ExampleDelimitedUDChunker 类在分隔符处划分输入。您可以将此块分割器与任何理解分隔输入的解析器一起使用。 ExampleDelimitedParser 是一个使用这个块分割器的 ContinuousUDParser 子类。

加载和使用示例

按如下所示加载并使用示例。

=> CREATE LIBRARY ExampleDelimitedParserLib AS '/home/dbadmin/EDP.so';

=> CREATE PARSER ExampleDelimitedParser AS
    LANGUAGE 'C++' NAME 'DelimitedParserFrameworkExampleFactory'
    LIBRARY ExampleDelimitedParserLib;

=> COPY t FROM stdin WITH PARSER ExampleDelimitedParser();
0
1
2
3
4
5
6
7
8
9
\.

块分割器实施

该块分割器支持分摊加载。alignPortion() 方法在当前部分找到第一个完整记录的开头,并将输入缓冲区与其对齐。记录终止符作为实参传递并在构造函数中设置。

StreamState ExampleDelimitedUDChunker::alignPortion(
            ServerInterface &srvInterface,
            DataBuffer &input, InputState state)
{
    /* find the first record terminator.  Its record belongs to the previous portion */
    void *buf = reinterpret_cast<void *>(input.buf + input.offset);
    void *term = memchr(buf, recordTerminator, input.size - input.offset);

    if (term) {
        /* record boundary found.  Align to the start of the next record */
        const size_t chunkSize = reinterpret_cast<size_t>(term) - reinterpret_cast<size_t>(buf);
        input.offset += chunkSize
            + sizeof(char) /* length of record terminator */;

        /* input.offset points at the start of the first complete record in the portion */
        return DONE;
    } else if (state == END_OF_FILE || state == END_OF_PORTION) {
        return REJECT;
    } else {
        VIAssert(state == START_OF_PORTION || state == OK);
        return INPUT_NEEDED;
    }
}

process() 方法必须考虑跨越部分边界的块。如果之前的调用在某个部分的末尾,则该方法设置一个标志。代码首先检查并处理该条件。逻辑与 alignPortion() 类似,所以示例调用它来做部分除法。

StreamState ExampleDelimitedUDChunker::process(
                ServerInterface &srvInterface,
                DataBuffer &input,
                InputState input_state)
{
    const size_t termLen = 1;
    const char *terminator = &recordTerminator;

    if (pastPortion) {
        /*
         * Previous state was END_OF_PORTION, and the last chunk we will produce
         * extends beyond the portion we started with, into the next portion.
         * To be consistent with alignPortion(), that means finding the first
         * record boundary, and setting the chunk to be at that boundary.
         * Fortunately, this logic is identical to aligning the portion (with
         * some slight accounting for END_OF_FILE)!
         */
        const StreamState findLastTerminator = alignPortion(srvInterface, input);

        switch (findLastTerminator) {
            case DONE:
                return DONE;
            case INPUT_NEEDED:
                if (input_state == END_OF_FILE) {
                    /* there is no more input where we might find a record terminator */
                    input.offset = input.size;
                    return DONE;
                }
                return INPUT_NEEDED;
            default:
                VIAssert("Invalid return state from alignPortion()");
        }
        return findLastTerminator;
    }

现在,该方法查找分隔符。如果输入从一个部分的末尾开始,它会设置标志。

    size_t ret = input.offset, term_index = 0;
    for (size_t index = input.offset; index < input.size; ++index) {
        const char c = input.buf[index];
        if (c == terminator[term_index]) {
            ++term_index;
            if (term_index == termLen) {
                ret = index + 1;
                term_index = 0;
            }
            continue;
        } else if (term_index > 0) {
            index -= term_index;
        }

        term_index = 0;
    }

    if (input_state == END_OF_PORTION) {
        /*
         * Regardless of whether or not a record was found, the next chunk will extend
         * into the next portion.
         */
        pastPortion = true;
    }

最后,process() 移动输入偏移量并返回。

    // if we were able to find some rows, move the offset to point at the start of the next (potential) row, or end of block
    if (ret > input.offset) {
        input.offset = ret;
        return CHUNK_ALIGNED;
    }

    if (input_state == END_OF_FILE) {
        input.offset = input.size;
        return DONE;
    }

    return INPUT_NEEDED;
}

工厂实施

文件 ExampleDelimitedParser.cpp 定义了一个使用此 UDChunker 的工厂。块分割器支持分摊加载,因此工厂实施 isChunkerApportionable()

    virtual bool isChunkerApportionable(ServerInterface &srvInterface) {
        ParamReader params = srvInterface.getParamReader();
        if (params.containsParameter("disable_chunker") && params.getBoolRef("d\
isable_chunker")) {
            return false;
        } else {
            return true;
        }
    }

prepareChunker() 方法创建块分割器:

    virtual UDChunker* prepareChunker(ServerInterface &srvInterface,
                                      PerColumnParamReader &perColumnParamReade\
r,
                                      PlanContext &planCtxt,
                                      const SizedColumnTypes &returnType)
    {
        ParamReader params = srvInterface.getParamReader();
        if (params.containsParameter("disable_chunker") && params.getBoolRef("d\
isable_chunker")) {
            return NULL;
        }

        std::string recordTerminator("\n");

        ParamReader args(srvInterface.getParamReader());
        if (args.containsParameter("record_terminator")) {
            recordTerminator = args.getStringRef("record_terminator").str();
        }

        return vt_createFuncObject<ExampleDelimitedUDChunker>(srvInterface.allo\
cator,
                recordTerminator[0]);
    }

3.9 - Python 示例:复杂类型的 JSON 解析器

以下示例详细说明了 UDParser,它接受 JSON 对象并将其解析为复杂类型。对于此示例,解析器假设输入数据是具有两个整数字段的行数组。输入记录应使用换行符进行分隔。如果 JSON 输入未指定任何行字段,则函数会将这些字段解析为 NULL。

此 UDParser 的源代码还包含一个工厂方法,用于解析具有整数和整数字段数组的行。解析器的实施与工厂中的返回类型无关,因此您可以创建具有不同返回类型且均指向 prepare() 方法中的 ComplexJsonParser() 类的工厂。完整的源代码位于 /opt/vertica/sdk/examples/python/UDParsers.py 中。

加载和使用示例

加载库并创建解析器,如下所示:


=> CREATE OR REPLACE LIBRARY UDParsers AS '/home/dbadmin/examples/python/UDParsers.py' LANGUAGE 'Python';

=> CREATE PARSER ComplexJsonParser AS LANGUAGE 'Python' NAME 'ArrayJsonParserFactory' LIBRARY UDParsers;

您现在可以定义一个表,然后使用 JSON 解析器将数据加载到其中,例如:


=> CREATE TABLE orders (a bool, arr array[row(a int, b int)]);
CREATE TABLE

=> COPY orders (arr) FROM STDIN WITH PARSER ComplexJsonParser();
[]
[{"a":1, "b":10}]
[{"a":1, "b":10}, {"a":null, "b":10}]
[{"a":1, "b":10},{"a":10, "b":20}]
[{"a":1, "b":10}, {"a":null, "b":null}]
[{"a":1, "b":2}, {"a":3, "b":4}, {"a":5, "b":6}, {"a":7, "b":8}, {"a":9, "b":10}, {"a":11, "b":12}, {"a":13, "b":14}]
\.

=> SELECT * FROM orders;
a |                                  arr
--+--------------------------------------------------------------------------
  | []
  | [{"a":1,"b":10}]
  | [{"a":1,"b":10},{"a":null,"b":10}]
  | [{"a":1,"b":10},{"a":10,"b":20}]
  | [{"a":1,"b":10},{"a":null,"b":null}]
  | [{"a":1,"b":2},{"a":3,"b":4},{"a":5,"b":6},{"a":7,"b":8},{"a":9,"b":10},{"a":11,"b":12},{"a":13,"b":14}]
(6 rows)

设置

所有 Python UDx 都必须导入 Vertica SDK 库。 ComplexJsonParser() 也需要 json 库。

import vertica_sdk
import json

工厂实施

prepare() 方法将实例化并返回解析器:


def prepare(self, srvInterface, perColumnParamReader, planCtxt, returnType):
    return ComplexJsonParser()

getParserReturnType() 声明返回类型必须是行数组,其中每个行都有两个整数字段:


def getParserReturnType(self, rvInterface, perColumnParamReader, planCtxt, argTypes, returnType):
    fieldTypes = vertica_sdk.SizedColumnTypes.makeEmpty()
    fieldTypes.addInt('a')
    fieldTypes.addInt('b')
    returnType.addArrayType(vertica_sdk.SizedColumnTypes.makeRowType(fieldTypes, 'elements'), 64, 'arr')

解析器实施

process() 方法会使用 InputBuffer 读入数据,然后在换行符处拆分该输入数据。随后,该方法会将处理后的数据传递给 writeRows() 方法。 writeRows() 会将每个数据行转换为 JSON 对象,检查该 JSON 对象的类型,然后将相应的值或对象写入输出。


class ComplexJsonParser(vertica_sdk.UDParser):

    leftover = ''

    def process(self, srvInterface, input_buffer, input_state, writer):
        input_buffer.setEncoding('utf-8')

        self.count = 0
        rec = self.leftover + input_buffer.read()
        row_lst = rec.split('\n')
        self.leftover = row_lst[-1]
        self.writeRows(row_lst[:-1], writer)
        if input_state == InputState.END_OF_FILE:
            self.writeRows([self.leftover], writer)
            return StreamState.DONE
        else:
            return StreamState.INPUT_NEEDED

    def writeRows(self, str_lst, writer):
        for s in str_lst:
            stripped = s.strip()
            if len(stripped) == 0:
                return
            elif len(stripped) > 1 and stripped[0:2] == "//":
                continue
            jsonValue = json.loads(stripped)
            if type(jsonValue) is list:
                writer.setArray(0, jsonValue)
            elif jsonValue is None:
                writer.setNull(0)
            else:
                writer.setRow(0, jsonValue)
            writer.next()

4 - 加载并行度

Vertica 可以拆分加载数据的工作,从而利用并行度来加快操作速度。Vertica 支持多种类型的并行度:

  • 分布式加载:Vertica 会将多文件加载中的文件分配给多个节点以并行加载,而不是在单个节点上加载所有文件。Vertica 会管理分布式加载;您无需在 UDL 中执行任何特殊操作。

  • 协作解析:在单个节点上加载的源会使用多线程来并行执行解析。协作解析会根据线程的调度方式在执行时拆分加载。您必须在解析器中启用协作解析。请参阅协作解析

  • 分摊加载:Vertica 会将单个大型文件或其他单个源拆分成多个段,以将段分配给多个节点进行并行加载。分摊加载会根据每个节点上的可用节点和核心在计划时拆分加载。您必须在源代码和解析器中启用分摊加载。请参阅分摊加载

您可以在同一 UDL 中同时支持协作解析和分摊加载。Vertica 可确定要对每个加载操作使用哪个并行度,也可能同时使用这两者。请参阅组合协作解析和分摊加载

4.1 - 协作解析

默认情况下,Vertica 会在一个数据库节点上的单个线程中解析数据源。您可以选择使用协作解析,以在节点上使用多个线程来解析源。更具体地说,源中的数据将通过块分割器,块分割器会将源流中的块分组为逻辑单元。可以并行解析这些块。块分割器会将输入拆分为可单独解析的块,然后由解析器同时进行解析。协作解析仅适用于非隔离 UDx。(请参阅隔离和非隔离模式。)

要使用协作解析,块分割器必须能够在输入中找到记录结尾标记。并非在所有输入格式中都能找到这些标记。

块分割器由解析器工厂创建。在加载时,Vertica 会先调用 UDChunker 以将输入拆分为块,然后调用 UDParser 以解析每个块。

您可以单独或同时使用协作解析和分摊加载。请参阅组合协作解析和分摊加载

Vertica 如何拆分加载

当 Vertica 收到来自源的数据时,它会重复调用块分割器的 process() 方法。块分割器本质上是一个轻量级解析器。process() 方法用于将输入拆分为块,而不是进行解析。

块分割器完成将输入拆分为块后,Vertica 会将这些块发送到尽可能多的可用解析器,从而在解析器上调用 process() 方法。

实施协作解析

要实施协作解析,请执行以下操作:

  • 子类化 UDChunker 并实施 process()

  • ParserFactory 中,实施 prepareChunker() 以返回 UDChunker

请参阅 C++ 示例:分隔解析器和块分割器以了解同样支持分摊加载的 UDChunker

4.2 - 分摊加载

解析器可以使用多个数据库节点来并行加载单个输入源。此方法称为分摊加载。在 Vertica 内置的解析器中,默认(分隔)解析器支持分摊加载。

与协作解析一样,分摊加载也需要可以在记录边界进行拆分的输入。不同之处在于,协作解析会执行顺序扫描以查找记录边界,而分摊加载会先跳(寻找)至给定位置,然后再执行扫描。某些格式(例如通用 XML)不支持寻找。

要使用分摊加载,您必须确保所有参与的数据库节点都可以访问源。分摊加载通常与分布式文件系统结合使用。

解析器可以不直接支持分摊加载,但需要包含支持分摊的块分割器。

您可以单独或同时使用分摊加载和协作解析。请参阅组合协作解析和分摊加载

Vertica 如何分摊加载

如果解析器及其源均支持分摊,则您可以指定将单个输入分发到多个数据库节点进行加载。SourceFactory 可将输入拆分为多个部分,并将它们分配给执行节点。每个 Portion 包含一个输入偏移量和一个大小。Vertica 会将各个部分及其参数分发到执行节点。在每个节点上运行的源工厂将为给定部分生成一个 UDSource

UDParser 首先会确定从何处开始解析:

  • 如果该部分是输入中的第一部分,则解析器会前进至该偏移量并开始解析。

  • 如果不是输入中的第一部分,则解析器会前进该偏移量,然后执行扫描直至找到记录的结尾。由于记录分散在多个部分,因此在到达第一个记录结尾之后开始解析。

解析器必须完成整个记录,这可能要求解析器在该部分的结尾之后继续读取。无论记录在何处结束,解析器都负责处理从已分配的部分开始的所有记录。此工作主要在解析器的 process() 方法中进行。

有时,一个部分不包含可由已分配的节点解析的任何内容。例如,假设某个记录从第 1 部分开始,而且贯穿第 2 部分并在第 3 部分结束。分配给第 1 部分的解析器将解析该记录,而分配给第 3 部分的解析器将在该记录之后开始解析。但是,分配给第 2 部分的解析器没有任何记录从此部分开始。

如果加载也使用 协作解析,则在分摊加载之后和解析之前,Vertica 会将部分拆分成块以进行并行加载。

实施分摊加载

要实施分摊加载,请在源、解析器及其工厂中执行下列操作。

在您的 SourceFactory 子类中:

  • 实施 isSourceApportionable() 并返回 true

  • 实施 plan(),以确定部分大小,指定部分,以及将各个部分分配给执行节点。要将多个部分分配给特定的执行程序,请使用计划上下文 (PlanContext::getWriter()) 上的参数编写器来传递信息。

  • 实施 prepareUDSources()。Vertica 将使用由工厂创建的计划上下文对每个执行节点调用此方法。此方法将返回 UDSource 实例,以用于该节点的已分配部分。

  • 如果源可以利用并行度,您可以实施 getDesiredThreads() 为每个源请求多个线程。有关此方法的详细信息,请参阅 SourceFactory 类

UDSource 子类中,与对任何其他源一样使用已分配的部分实施 process()。可以使用 getPortion() 检索此部分。

在您的 ParserFactory 子类中:

  • 实施 isParserApportionable() 并返回 true

  • 如果解析器使用支持分摊加载的 UDChunker,请实施 isChunkerApportionable()

在您的 UDParser 子类中:

  • 编写 UDParser 子类,使其对各个部分而非整个源执行操作。您可以通过处理流状态 PORTION_STARTPORTION_END 或使用 ContinuousUDParser API 来完成该编写。解析器必须扫描该部分的开头,查找该部分之后的第一个记录边界并解析至从该部分开始的最后一个记录的结尾。请注意,此行为可能会要求解析器在该部分的结尾之后继续读取。

  • 对某个部分不包含任何记录这一特殊情况进行如下处理:返回但不写入任何输出。

UDChunker 子类中,实施 alignPortion()。请参阅对齐部分

示例

SDK 在 ApportionLoadFunctions 目录中提供了分摊加载的 C++ 示例:

  • FilePortionSourceUDSource 的子类。

  • DelimFilePortionParserContinuousUDParser 的子类。

将这些类一起使用。您还可以将 FilePortionSource 与内置的分隔解析器结合使用。

以下示例显示了如何加载库并在数据库中创建函数:


=> CREATE LIBRARY FilePortionSourceLib as '/home/dbadmin/FP.so';

=> CREATE LIBRARY DelimFilePortionParserLib as '/home/dbadmin/Delim.so';

=> CREATE SOURCE FilePortionSource AS
LANGUAGE 'C++' NAME 'FilePortionSourceFactory' LIBRARY FilePortionSourceLib;

=> CREATE PARSER DelimFilePortionParser AS
LANGUAGE 'C++' NAME 'DelimFilePortionParserFactory' LIBRARY DelimFilePortionParserLib;

以下示例显示了如何使用源和解析器来加载数据:


=> COPY t WITH SOURCE FilePortionSource(file='g1/*.dat') PARSER DelimFilePortionParser(delimiter = '|',
    record_terminator = '~');

4.3 - 组合协作解析和分摊加载

您可以在同一解析器中同时启用协作解析分摊加载,从而允许 Vertica 决定如何加载数据。

决定如何拆分加载

Vertica 在查询规划时会尽可能使用分摊加载。它决定是否在执行时也使用协作解析。

分摊加载需要 SourceFactory 支持。如果指定合适的 UDSource,Vertica 将在规划时对 ParserFactory 调用 isParserApportionable() 方法。如果此方法返回 true,Vertica 将分摊加载。

如果 isParserApportionable() 返回 false,但 isChunkerApportionable() 返回 true,则块分割器可用于协作解析且块分割器支持分摊加载。Vertica 会分摊加载。

如果这些方法均未返回 true,则 Vertica 不会分摊加载。

在执行时,Vertica 会先检查加载是否在非隔离模式下运行,并仅在非隔离模式下运行才继续操作。在隔离模式下,不支持协作解析。

如果未分摊加载且有多个线程可用,Vertica 将使用协作解析。

如果已分摊加载且正好只有一个线程可用,则 Vertica 在当且仅当解析器不可分摊时才使用协作解析。在这种情况下,块分割器是可分摊的,但解析器是不可分摊的。

如果已分摊加载、有多个线程可用且块分割器是可分摊的,则 Vertica 会使用协作解析。

如果 Vertica 使用协作解析,但 prepareChunker() 未返回 UDChunker 实例,则 Vertica 会报告错误。

执行分摊的协作加载

如果加载同时使用分摊加载和协作解析,则 Vertica 会使用 SourceFactory 将输入分解为多个部分。然后,它将这些部分分配给执行节点。请参阅 Vertica 如何分摊加载

在执行节点上,Vertica 会调用块分割器的 alignPortion() 方法以将输入与部分边界对齐。(此步骤在第一部分已跳过,且根据定义已在开始时对齐。)必须执行此步骤,因为使用分摊加载的解析器有时必须在相应部分结尾之后继续读取,因此块分割器需要找到终点。

对齐相应部分后,Vertica 会重复调用块分割器的 process() 方法。请参阅 Vertica 如何拆分加载

然后,将块分割器找到的各个块发送给解析器的 process() 方法以按照常规方式进行处理。

5 - 连续加载

ContinuousUDSourceContinuousUDFilterContinuousUDParser 类允许您根据需要写入和处理数据,而不必遍历数据。Python API 不支持连续加载。

每个类均包含以下函数:

  • initialize() - 已在 run() 之前调用。您可以选择覆盖此函数以执行设置和初始化。

  • run() - 处理数据。

  • deinitialize() - 已在 run() 返回之后调用。您可以选择覆盖此函数以执行分解和消除。

请勿覆盖从父类继承的 setup()process()destroy() 函数。

您可以使用 yield() 函数在服务器空闲或陷入忙循环期间将控制权交回服务器,以便该服务器可以检查状态变化或取消查询。

这三个类会使用关联的 ContinuousReaderContinuousWriter 类来读取输入数据和写入输出数据。

ContinuousUDSource API (C++)

ContinuousUDSource 类会扩展 UDSource 并添加以下方法以通过子类扩展:

virtual void initialize(ServerInterface &srvInterface);

virtual void run();

virtual void deinitialize(ServerInterface &srvInterface);

ContinuousUDFilter API (C++)

ContinuousUDFilter 类会扩展 UDFilter 并添加以下方法以通过子类扩展:

virtual void initialize(ServerInterface &srvInterface);

virtual void run();

virtual void deinitialize(ServerInterface &srvInterface);

ContinuousUDParser API

ContinuousUDParser 类会扩展 UDParser 并添加以下方法以通过子类扩展:

virtual void initialize(ServerInterface &srvInterface);

virtual void run();

virtual void deinitialize(ServerInterface &srvInterface);

ContinuousUDParser 类会扩展 UDParser 并添加以下方法以通过子类扩展:


public void initialize(ServerInterface srvInterface, SizedColumnTypes returnType);

public abstract void run() throws UdfException;

public void deinitialize(ServerInterface srvInterface, SizedColumnTypes returnType);

有关其他实用程序方法,请参阅 API 文档。

6 - 缓冲区类

缓冲区类可用作所有 UDL 函数的原始数据流的句柄。C++ 和 Java API 对输入和输出使用单个 DataBuffer 类。Python API 包含两个类:InputBufferOutputBuffer

DataBuffer API(C++、java)

DataBuffer 类具有指向缓冲区和大小的指针,以及指示已使用的流量的偏移量。

/**
* A contiguous in-memory buffer of char *
*/
    struct DataBuffer {
    /// Pointer to the start of the buffer
    char * buf;

    /// Size of the buffer in bytes
    size_t size;

    /// Number of bytes that have been processed by the UDL
    size_t offset;
};

DataBuffer 类具有可指示已使用的流量的偏移量。因为 Java 是一种其字符串需要注意字符编码的语言,所以 UDx 必须对缓冲区进行解码或编码。解析器可以通过直接访问缓冲区来与流交互。

/**
* DataBuffer is a a contiguous in-memory buffer of data.
*/
public class DataBuffer {

/**
* The buffer of data.
*/
public byte[] buf;

/**
* An offset into the buffer that is typically used to track progress
* through the DataBuffer. For example, a UDParser advances the
* offset as it consumes data from the DataBuffer.
*/
public int offset;}

InputBuffer API 和 OutputBuffer API (Python)

Python InputBuffer 和 OutputBuffer 类会取代 C++ 和 Java API 中的 DataBuffer 类。

InputBuffer 类

InputBuffer 类会根据指定的编码来解码和转换原始数据流。Python 原本就支持各种语言和编解码器。InputBuffer 是 UDFilters 和 UDParsers 的 process() 方法的实参。用户会通过调用 InputBuffer 的方法来与 UDL 的数据流进行交互

如果没有为 setEncoding() 指定值,Vertica 会假设值为 NONE。

class InputBuffer:
    def getSize(self):
        ...
    def getOffset(self):
    ...

    def setEncoding(self, encoding):
        """
        Set the encoding of the data contained in the underlying buffer
        """
        pass

    def peek(self, length = None):
        """
        Copy data from the input buffer into Python.
        If no encoding has been specified, returns a Bytes object containing raw data.
        Otherwise, returns data decoded into an object corresponding to the specified encoding
        (for example, 'utf-8' would return a string).
        If length is None, returns all available data.
        If length is not None then the length of the returned object is at most what is requested.
        This method does not advance the buffer offset.
        """
        pass

    def read(self, length = None):
        """
        See peek().
        This method does the same thing as peek(), but it also advances the
        buffer offset by the number of bytes consumed.
        """
        pass

        # Advances the DataBuffer offset by a number of bytes equal to the result
        # of calling "read" with the same arguments.
        def advance(self, length = None):
        """
        Advance the buffer offset by the number of bytes indicated by
        the length and encoding arguments.  See peek().
    Returns the new offset.
        """
        pass

OutputBuffer 类

OutputBuffer 类会对 Python 中的数据进行编码并输出到 Vertica。OutputBuffer 是 UDFilters 和 UDParsers 的 process() 方法的实参。用户会通过调用 OutputBuffer 的方法来操作数据并进行编码,从而与 UDL 的数据流进行交互。

write() 方法会将所有数据从 Python 客户端传输到 Vertica。输出缓冲区可以接受任何大小的对象。如果用户向 OutputBuffer 写入的对象大于 Vertica 可以立即处理的大小,则 Vertica 会存储溢出。在下一次调用 process() 期间,Vertica 会检查剩余数据。如果有剩余数据,Vertica 会先将其复制到 DataBuffer,然后再确定是否需要从 Python UDL 调用 process()

如果没有为 setEncoding() 指定值,Vertica 会假设值为 NONE。

class OutputBuffer:
def setEncoding(self, encoding):
"""
Specify the encoding of the data which will be written to the underlying buffer
"""
pass
def write(self, data):
"""
Transfer bytes from the data object into Vertica.
If an encoding was specified via setEncoding(), the data object will be converted to bytes using the specified encoding.
Otherwise, the data argument is expected to be a Bytes object and is copied to the underlying buffer.
"""
pass