这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

转换函数 (UDTF)

用户定义的转换函数 (UDTF) 可用于将数据表转换成其他表。该函数读取一个或多个参数(视为数据行),并返回包含一列或多列的零行或多行数据。UDTF 可以生成任意数量的行作为输出。但是,输出的每个行必须是完整的。在未向每个列添加值的情况下前进到下一行会生成不正确的结果。

输出表的架构并不需要与输入表的架构相对应,它们可以完全不同。UDTF 可为每行输入返回任意行输出。

UDTF 只能用在仅包含 UDTF 调用和所需 OVER 子句的 SELECT 列表中。多阶段 UDTF 可以使用分区列 (PARTITION BY),但其他 UDTF 不能。

在语句中与 GROUP BY 和 ORDER BY 结合使用时,UDTF 在 GROUP BY 之后运行,但在最后一个 ORDER BY 之前运行。ORDER BY 子句可能仅包含窗口分区子句中的列或表达式(请参阅窗口分区)。

UDTF 最多可读取 9800 个参数(输入列)。尝试向 UDTF 传递更多参数会返回错误。

1 - TransformFunction 类

您可以在 TransformFunction 类中执行数据处理,以及将输入行转换为输出行。子类必须定义 processPartition() 方法。该类可定义多种方法以设置和分解该函数。

执行转换

processPartition() 方法可执行您希望 UDTF 执行的所有处理。当用户在 SQL 语句中调用您的函数时,Vertica 会将来自函数参数的数据绑定在一起并将其传递至 processPartition()

processPartition() 方法的输入和输出由 PartitionReaderPartitionWriter 类的对象提供。这些类定义了用于为 UDTF 读取输入数据和写入输出数据的方法。

UDTF 不一定像 UDSF 那样对单行进行操作。UDTF 可以随时读取任意数量的行并写入输出。

在实施 processPartition() 时,请遵循以下准则:

  • PartitionReader 对象中调用特定于数据类型的函数以提取每个输入参数。这些函数全部使用一个参数:要读取的输入行的列号。函数可能需要处理 NULL 值。

  • 写入输出时,UDTF 必须提供在工厂中定义的所有输出列的值。与读取输入列类似,PartitionWriter 对象具备将每种类型的数据写入输出行的函数。

  • 请使用 PartitionReader.next() 确定是否还有更多输入需要处理,并在输入处理完时退出。

  • 在某些情况下,您可能需要使用 PartitionReadergetNumCols()getTypeMetaData() 函数来确定参数的数量和类型,而非仅对输入行中列的数据类型进行硬编码。如果希望 TransformFunction 能够处理具有不同架构的输入表,则此方法很有用。然后,您可以使用不同的 TransformFunctionFactory 类定义调用同一个 TransformFunction 类的多个函数签名。有关详细信息,请参阅重载 UDx

设置和分解

TransformFunction 类定义了两种其他方法,您可以选择性地实施这两种方法以分配和释放资源: setup()destroy()。您应使用这些方法来分配和取消分配那些不通过 UDx API 分配的资源(有关详细信息,请参阅为 UDx 分配资源)。

API

TransformFunction API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface,
        const SizedColumnTypes &argTypes);

virtual void processPartition(ServerInterface &srvInterface,
        PartitionReader &input_reader, PartitionWriter &output_writer)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface,
        const SizedColumnTypes &argTypes);

PartitionReaderPartitionWriter 类会为列值提供 getter 和 setter,以及用于遍历分区的 next()。有关详细信息,请参阅 API 参考文档。

TransformFunction API 提供了以下通过子类扩展的方法:

public void setup(ServerInterface srvInterface, SizedColumnTypes argTypes);

public abstract void processPartition(ServerInterface srvInterface,
        PartitionReader input_reader, PartitionWriter input_writer)
    throws UdfException, DestroyInvocation;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface, SizedColumnTypes argTypes);

PartitionReaderPartitionWriter 类会为列值提供 getter 和 setter,以及用于遍历分区的 next()。有关详细信息,请参阅 API 参考文档。

TransformFunction API 提供了以下通过子类扩展的方法:


def setup(self, server_interface, col_types)

def processPartition(self, server_interface, partition_reader, partition_writer)

def destroy(self, server_interface, col_types)

PartitionReaderPartitionWriter 类会为列值提供 getter 和 setter,以及用于遍历分区的 next()。有关详细信息,请参阅 API 参考文档。

实施 主函数 API 以定义转换函数:

FunctionName <- function(input.data.frame, parameters.data.frame) {
  # Computations

  # The function must return a data frame.
  return(output.data.frame)
}

2 - TransformFunctionFactory 类

TransformFunctionFactory 类将向 Vertica 提供有关 UDTF 的元数据:其参数的数量和数据类型及其返回值的数据类型。该类还会实例化 TransformFunction 的子类。

您必须在 TransformFunctionFactory 中实施以下方法:

  • getPrototype() 将返回两个 ColumnTypes 对象,这两个对象描述 UDTF 将用作输入的列以及作为输出返回的列。

  • getReturnType() 将向 Vertica 提供有关输出值的详细信息:可变大小数据类型(例如 VARCHAR)的宽度和具有可设置精度的数据类型(例如 TIMESTAMP)的精度。您还可以使用此函数设置输出列的名称。虽然此方法对于返回单个值的 UDx 是可选的,但您必须为 UDTF 实施该方法。

  • createTransformFunction() 实例化 TransformFunction 子类。

对于使用 C++ 语言编写的转换函数,您可以提供有助于优化查询的信息。请参阅提高查询性能(仅限 C++)

API

TransformFunctionFactory API 提供了以下通过子类扩展的方法:

virtual TransformFunction *
    createTransformFunction (ServerInterface &srvInterface)=0;

virtual void getPrototype(ServerInterface &srvInterface,
            ColumnTypes &argTypes, ColumnTypes &returnType)=0;

virtual void getReturnType(ServerInterface &srvInterface,
            const SizedColumnTypes &argTypes,
            SizedColumnTypes &returnType)=0;

virtual void getParameterType(ServerInterface &srvInterface,
            SizedColumnTypes &parameterTypes);

TransformFunctionFactory API 提供了以下通过子类扩展的方法:

public abstract TransformFunction createTransformFunction(ServerInterface srvInterface);

public abstract void getPrototype(ServerInterface srvInterface, ColumnTypes argTypes, ColumnTypes returnType);

public abstract void getReturnType(ServerInterface srvInterface, SizedColumnTypes argTypes,
        SizedColumnTypes returnType) throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

TransformFunctionFactory API 提供了以下通过子类扩展的方法:


def createTransformFunction(self, srv)

def getPrototype(self, srv_interface, arg_types, return_type)

def getReturnType(self, srv_interface, arg_types, return_type)

def getParameterType(self, server_interface, parameterTypes)

实施 工厂函数 API 以定义转换函数工厂:

FunctionNameFactory <- function() {
  list(name    = FunctionName,
       udxtype = c("scalar"),
       intype  = c("int"),
       outtype = c("int"))
}

3 - MultiPhaseTransformFunctionFactory 类

使用多阶段 UDTF,您可以将数据处理分为多个步骤。使用此功能后,您的 UDTF 可以使用类似于 Hadoop 或其他 MapReduce 框架的方式执行处理。您可以使用第一阶段来分解并收集数据,然后使用后续阶段来处理数据。例如,您的 UDTF 的第一阶段可以是从存储在表列的 Web 服务器日志中提取特定类型的用户交互,而且后续阶段可以是对这些交互执行分析。

多阶段 UDTF 还能让您决定在什么地方进行处理:在每个节点本地,还是在整个群集内。如果您的多阶段 UDTF 类似于 MapReduce 流程,您可能希望在多阶段 UDTF 的第一阶段处理存储在本地(运行 UDTF 实例的节点)的数据。这样可防止在 Vertica 群集中复制较大的数据段。根据在后续阶段执行的处理类型,您可以选择对数据进行分段并分布在 Vertica 群集中。

UDTF 的每个阶段都与传统(单阶段)UDTF 相同:它可以接收作为输入的表,然后生成作为输出的表。每个阶段的输出的架构不需要与其输入匹配,而且每个阶段可以根据需要输出大量或少量行。

要定义由每个阶段执行的处理,请创建 TransformFunction 的子类。如果已从单阶段 UDTF(用于执行您希望多阶段 UDTF 的其中一个阶段执行的处理)创建了 TransformFunction,则您可以轻松调整该子类以使其在多阶段 UDTF 中工作。

多阶段 UDTF 和传统 UDTF 不同之处是所使用的工厂类。应使用 MultiPhaseTransformFunctionFactory 的子类定义多阶段 UDTF,而非 TransformFunctionFactory。此特殊工厂类用作多步骤 UDTF 中所有阶段的容器。此工厂类通过 getPrototype() 函数向 Vertica 提供整个多阶段 UDTF 的输入要求和输出要求,以及该 UDTF 中所有阶段的列表。

MultiPhaseTransformFunctionFactory 类的子类中,可以定义 TransformFunctionPhase 的一个或多个子类。对于多阶段 UDTF 中的每个阶段,这些类充当与 TransformFunctionFactory 类相同的角色。这些类定义每个阶段的输入和输出并创建其关联的 TransformFunction 类的实例,以对 UDTF 的每个阶段执行处理。除了这些子类之外,MultiPhaseTransformFunctionFactory 还包含可用于控制每个 TransformFunctionPhase 子类的实例的字段。

API

MultiPhaseTransformFunctionFactory 类扩展了 TransformFunctionFactory API 提供了以下其他方法以通过子类扩展:

virtual void getPhases(ServerInterface &srvInterface,
        std::vector< TransformFunctionPhase * > &phases)=0;

如果使用该工厂,则还必须扩展 TransformFunctionPhase。请参阅 SDK 参考文档。

MultiPhaseTransformFunctionFactory 类扩展了 TransformFunctionFactory。API 提供了以下通过子类扩展的方法:

public abstract void getPhases(ServerInterface srvInterface,
        Vector< TransformFunctionPhase > phases);

如果使用该工厂,则还必须扩展 TransformFunctionPhase。请参阅 SDK 参考文档。

TransformFunctionFactory 类扩展了 TransformFunctionFactory。对于每个阶段,工厂必须定义可扩展 TransformFunctionPhase 的类。

工厂将添加以下方法:


def getPhase(cls, srv)

TransformFunctionPhase 包含以下方法:


def createTransformFunction(cls, srv)

def getReturnType(self, srv_interface, input_types, output_types)

4 - 提高查询性能(仅限 C++)

评估查询时,Vertica 优化器可能会对其输入进行排序以提高性能。如果函数已返回排序后的数据,这意味着优化器正在执行额外的工作。使用 C++ 语言编写的转换函数可以声明其返回的数据的排序方式,这样优化器就可以利用这些信息。

转换函数会在函数的 processPartition() 方法中进行实际排序。要利用这种优化,排序必须为升序。您不需要对所有列进行排序,但必须先返回已排序的列。

您可以在工厂的 getReturnType() 方法中声明函数对其输出进行排序的方式。

PolyTopKPerPartition 示例会对输入列进行排序并返回给定的行数:

=> SELECT polykSort(14, a, b, c) OVER (ORDER BY a, b, c)
    AS (sort1,sort2,sort3) FROM observations ORDER BY 1,2,3;
 sort1 | sort2 | sort3
-------+-------+-------
     1 |     1 |     1
     1 |     1 |     2
     1 |     1 |     3
     1 |     2 |     1
     1 |     2 |     2
     1 |     3 |     1
     1 |     3 |     2
     1 |     3 |     3
     1 |     3 |     4
     2 |     1 |     1
     2 |     1 |     2
     2 |     2 |     3
     2 |     2 |    34
     2 |     3 |     5
(14 rows)

工厂会通过在每一列上设置 isSortedBy 属性在 getReturnType() 中声明此排序。每个 SizedColumnType 都存在关联的 Properties 对象,您可以在其中设置此值:


virtual void getReturnType(ServerInterface &srvInterface, const SizedColumnTypes &inputTypes, SizedColumnTypes &outputTypes)
{
    vector<size_t> argCols; // Argument column indexes.
    inputTypes.getArgumentColumns(argCols);
    size_t colIdx = 0;

    for (vector<size_t>::iterator i = argCols.begin() + 1; i < argCols.end(); i++)
    {
        SizedColumnTypes::Properties props;
        props.isSortedBy = true;
        std::stringstream cname;
        cname << "col" << colIdx++;
        outputTypes.addArg(inputTypes.getColumnType(*i), cname.str(), props);
    }
}

通过查看包含和不包含此设置的查询的 EXPLAIN 计划,可以查看此优化的效果。以下输出显示了 polyk(未排序的版本)的查询计划。记录排序成本:

=> EXPLAN SELECT polyk(14, a, b, c) OVER (ORDER BY a, b, c)
    FROM observations ORDER BY 1,2,3;

 Access Path:
 +-SORT [Cost: 2K, Rows: 10K] (PATH ID: 1)
 |  Order: col0 ASC, col1 ASC, col2 ASC
 | +---> ANALYTICAL [Cost: 2K, Rows: 10K] (PATH ID: 2)
 | |      Analytic Group
 | |       Functions: polyk()
 | |       Group Sort: observations.a ASC NULLS LAST, observations.b ASC NULLS LAST, observations.c ASC NULLS LAST
 | | +---> STORAGE ACCESS for observations [Cost: 2K, Rows: 10K]
 (PATH ID: 3)
 | | |      Projection: public.observations_super
 | | |      Materialize: observations.a, observations.b, observations.c

已排序版本的查询计划会忽略此步骤(从而节省成本)并从分析步骤开始(上一个计划中的第二个步骤):

=> EXPLAN SELECT polykSort(14, a, b, c) OVER (ORDER BY a, b, c)
    FROM observations ORDER BY 1,2,3;

Access Path:
 +-ANALYTICAL [Cost: 2K, Rows: 10K] (PATH ID: 2)
 |  Analytic Group
 |   Functions: polykSort()
 |   Group Sort: observations.a ASC NULLS LAST, observations.b ASC NULLS LAST, observations.c ASC NULLS LAST
 | +---> STORAGE ACCESS for observations [Cost: 2K, Rows: 10K] (PATH ID: 3)
 | |      Projection: public.observations_super
 | |      Materialize: observations.a, observations.b, observations.c

5 - 用于处理本地数据的分区选项

UDTF 通常以特定方式处理已分区的数据。例如,处理 Web 服务器日志文件以计算每个合作伙伴网站引用的点击数的 UDTF 需要具有按引用网站列分区的输入。UDTF 的每个实例将查看特定合作伙伴站点引用的点击,以便计算其数量。

在与此类似的情况下,窗口分区子句应使用 PARTITION BY 子句。群集中的每个节点将对其存储的数据进行分区并将部分分区发送给其他节点,然后合并从其他节点收到的分区并运行 UDTF 的一个实例以处理这些分区。

在其他情况下,UDTF 可能不需要以特殊方式对输入数据进行分区,例如,从 Apache 日志文件解析数据的 UDTF。在这种情况下,您可以指定每个 UDTF 实例仅处理由节点(该实例正在此节点上运行)存储在本地的数据。通过消除对数据进行分区的开销,可以大大提高处理效率。

可以使用以下窗口分区选项之一来指示 UDTF 仅处理本地数据:

  • PARTITION BEST:仅对于线程安全的 UDTF,可通过多个节点间的多线程查询来优化性能。

  • PARTITION NODES:优化多个节点间的单线程查询的性能。

查询必须指定将在所有节点之间复制且包含一行的源表(类似于 DUAL 表)。例如,以下语句将调用可解析存储在本地的 Apache 日志文件的 UDTF:

=> CREATE TABLE rep (dummy INTEGER) UNSEGMENTED ALL NODES;
CREATE TABLE
=> INSERT INTO rep VALUES (1);
 OUTPUT
--------
      1
(1 row)
=> SELECT ParseLogFile('/data/apache/log*') OVER (PARTITION BEST) FROM rep;

6 - C++ 示例:字符串分词器

以下示例显示了 TransformFunction 的子类,它名为 StringTokenizer。此子类定义的 UDTF 可读取包含 INTEGER ID 列和 VARCHAR 列的表。此子类可将 VARCHAR 列中的文本拆分为标记(各个词)。此子类将返回一个表,表中包含每个标记、标记所出现在的行以及标记在字符串中的位置。

加载和使用示例

以下示例显示了如何将函数加载至 Vertica 中。假设包含该函数的 TransformFunctions.so 库已复制到启动程序节点上数据库管理员用户的主目录中。

=> CREATE LIBRARY TransformFunctions AS
   '/home/dbadmin/TransformFunctions.so';
CREATE LIBRARY
=> CREATE TRANSFORM FUNCTION tokenize
   AS LANGUAGE 'C++' NAME 'TokenFactory' LIBRARY TransformFunctions;
CREATE TRANSFORM FUNCTION

然后,您可以通过 SQL 语句使用该函数,例如:


=> CREATE TABLE T (url varchar(30), description varchar(2000));
CREATE TABLE
=> INSERT INTO T VALUES ('www.amazon.com','Online retail merchant and provider of cloud services');
 OUTPUT
--------
      1
(1 row)
=> INSERT INTO T VALUES ('www.vertica.com','World''s fastest analytic database');
 OUTPUT
--------
      1
(1 row)
=> COMMIT;
COMMIT

=> -- Invoke the UDTF
=> SELECT url, tokenize(description) OVER (partition by url) FROM T;
       url       |   words
-----------------+-----------
 www.amazon.com  | Online
 www.amazon.com  | retail
 www.amazon.com  | merchant
 www.amazon.com  | and
 www.amazon.com  | provider
 www.amazon.com  | of
 www.amazon.com  | cloud
 www.amazon.com  | services
 www.vertica.com | World's
 www.vertica.com | fastest
 www.vertica.com | analytic
 www.vertica.com | database
(12 rows)

请注意,结果表中的行数和列数与输入表中不同。这是 UDTF 的优势之一。

TransformFunction 实施

以下代码显示了 StringTokenizer 类。

class StringTokenizer : public TransformFunction
{
  virtual void processPartition(ServerInterface &srvInterface,
                                PartitionReader &inputReader,
                                PartitionWriter &outputWriter)
  {
    try {
      if (inputReader.getNumCols() != 1)
        vt_report_error(0, "Function only accepts 1 argument, but %zu provided", inputReader.getNumCols());

      do {
        const VString &sentence = inputReader.getStringRef(0);

        // If input string is NULL, then output is NULL as well
        if (sentence.isNull())
          {
            VString &word = outputWriter.getStringRef(0);
            word.setNull();
            outputWriter.next();
          }
        else
          {
            // Otherwise, let's tokenize the string and output the words
            std::string tmp = sentence.str();
            std::istringstream ss(tmp);

            do
              {
                std::string buffer;
                ss >> buffer;

                // Copy to output
                if (!buffer.empty()) {
                  VString &word = outputWriter.getStringRef(0);
                  word.copy(buffer);
                  outputWriter.next();
                }
              } while (ss);
          }
      } while (inputReader.next() && !isCanceled());
    } catch(std::exception& e) {
      // Standard exception. Quit.
      vt_report_error(0, "Exception while processing partition: [%s]", e.what());
    }
  }
};

此示例中的 processPartition() 函数将遵循您在自己的 UDTF 中遵循的相同模式:遍历 Vertica 向其发送的表分区中的所有行,以处理每个行并在前进之前检查是否已取消查询。对于 UDTF,您实际上不必处理每个行。即使退出函数而不读取所有输入,也不会出现任何问题。如果 UDTF 在执行某种搜索或其他某项操作后确定其余资源是不需要的,您可以选择执行此操作。

在此示例中,processPartition() 会先从 PartitionReader 对象中提取包含文本的 VStringVString 类代表 Vertica 字符串值(VARCHAR 或 CHAR)。如果存在输入,它会对其进行字符串标记并使用 PartitionWriter 对象将其添加到输出中。

与读取输入列类似,PartitionWriter 类具有将每种类型的数据写入输出行的函数。在这种情况下,该示例会调用 PartitionWriter 对象的 getStringRef() 函数来分配新的 VString 对象以保存第一列的输出标记,然后将标记的值复制到 VString 中。

TransformFunctionFactory 实施

以下代码显示了该工厂类。

class TokenFactory : public TransformFunctionFactory
{
  // Tell Vertica that we take in a row with 1 string, and return a row with 1 string
  virtual void getPrototype(ServerInterface &srvInterface, ColumnTypes &argTypes, ColumnTypes &returnType)
  {
    argTypes.addVarchar();
    returnType.addVarchar();
  }

  // Tell Vertica what our return string length will be, given the input
  // string length
  virtual void getReturnType(ServerInterface &srvInterface,
                             const SizedColumnTypes &inputTypes,
                             SizedColumnTypes &outputTypes)
  {
    // Error out if we're called with anything but 1 argument
    if (inputTypes.getColumnCount() != 1)
      vt_report_error(0, "Function only accepts 1 argument, but %zu provided", inputTypes.getColumnCount());

    int input_len = inputTypes.getColumnType(0).getStringLength();

    // Our output size will never be more than the input size
    outputTypes.addVarchar(input_len, "words");
  }

  virtual TransformFunction *createTransformFunction(ServerInterface &srvInterface)
  { return vt_createFuncObject<StringTokenizer>(srvInterface.allocator); }

};

在此示例中:

  • UDTF 会将 VARCHAR 列作为输入。为了定义输入列,getPrototype() 会在表示输入表的 ColumnTypes 对象上调用 addVarchar()

  • UDTF 将返回 VARCHAR 作为输出。getPrototype() 函数会调用 addVarchar() 以定义输出表。

此示例必须返回 VARCHAR 输出列的最大长度。它会将长度设置为输入字符串的长度。这是一个安全值,因为输出长度永远不会超过输入字符串。此示例还会将 VARCHAR 输出列的名称设置为“words”。

此示例中的 createTransformFunction() 函数实施是样板代码。该实施仅使用与此工厂类关联的 TransformFunction 类的名称来调用 vt_returnFuncObj 宏。此宏负责将 TransformFunction 类的副本实例化,Vertica 可以使用该副本来处理数据。

RegisterFactory 宏

创建 UDTF 的最后一步是调用 RegisterFactory 宏。此宏可确保在 Vertica 加载包含 UDTF 的共享库时,工厂类已实例化。只有将工厂类初始化,Vertica 才能找到 UDTF 并确定其输入和输出,除此之外没有任何其他方法。

RegisterFactory 宏仅使用工厂类的名称:

RegisterFactory(TokenFactory);

7 - Python 示例:字符串分词器

以下示例显示将输入字符串分解为标记(基于空格)的转换函数。它类似于 C++ 和 Java 的分词器示例。

加载和使用示例

创建库和函数:

=> CREATE LIBRARY pyudtf AS '/home/dbadmin/udx/tokenize.py' LANGUAGE 'Python';
CREATE LIBRARY
=> CREATE TRANSFORM FUNCTION tokenize AS NAME 'StringTokenizerFactory' LIBRARY pyudtf;
CREATE TRANSFORM FUNCTION

然后,您可以在 SQL 语句中使用该函数,例如:

=> CREATE TABLE words (w VARCHAR);
CREATE TABLE
=> COPY words FROM STDIN;
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> this is a test of the python udtf
>> \.

=> SELECT tokenize(w) OVER () FROM words;
  token
----------
 this
 is
 a
 test
 of
 the
 python
 udtf
(8 rows)

设置

所有 Python UDx 都必须导入 Vertica SDK。

import vertica_sdk

UDTF Python 代码

以下代码定义了分词器及其工厂。

class StringTokenizer(vertica_sdk.TransformFunction):
    """
    Transform function which tokenizes its inputs.
    For each input string, each of the whitespace-separated tokens of that
    string is produced as output.
    """
    def processPartition(self, server_interface, input, output):
        while True:
            for token in input.getString(0).split():
                output.setString(0, token)
                output.next()
            if not input.next():
                break


class StringTokenizerFactory(vertica_sdk.TransformFunctionFactory):
    def getPrototype(self, server_interface, arg_types, return_type):
        arg_types.addVarchar()
        return_type.addVarchar()
    def getReturnType(self, server_interface, arg_types, return_type):
        return_type.addColumn(arg_types.getColumnType(0), "tokens")
    def createTransformFunction(cls, server_interface):
        return StringTokenizer()

8 - R 示例:日志分词器

LogTokenizer 转换函数会从表中读取可变长字符串,即日志消息。然后,它会标记每条日志消息的字符串,以返回每个标记。

您可以在 Vertica Github 存储库中找到更多 UDx 示例:https://github.com/vertica/UDx-Examples

加载函数和库

创建库和函数。

=> CREATE OR REPLACE LIBRARY rLib AS 'log_tokenizer.R' LANGUAGE 'R';
CREATE LIBRARY
=> CREATE OR REPLACE TRANSFORM FUNCTION LogTokenizer AS LANGUAGE 'R' NAME 'LogTokenizerFactory' LIBRARY rLib FENCED;
CREATE FUNCTION

使用函数查询数据

以下查询显示了如何使用 UDTF 运行查询。

=> SELECT machine,
          LogTokenizer(error_log USING PARAMETERS spliton = ' ') OVER(PARTITION BY machine)
     FROM error_logs;
 machine |  Token
---------+---------
 node001 | ERROR
 node001 | 345
 node001 | -
 node001 | Broken
 node001 | pipe
 node001 | WARN
 node001 | -
 node001 | Nearly
 node001 | filled
 node001 | disk
 node002 | ERROR
 node002 | 111
 node002 | -
 node002 | Flooded
 node002 | roads
 node003 | ERROR
 node003 | 222
 node003 | -
 node003 | Plain
 node003 | old
 node003 | broken
(21 rows)

UDTF R 代码


LogTokenizer <- function(input.data.frame, parameters.data.frame) {
  # Take the spliton parameter passed by the user and assign it to a variable
  # in the function so we can use that as our tokenizer.
  if ( is.null(parameters.data.frame[['spliton']]) ) {
    stop("NULL value for spliton! Token cannot be NULL.")
  } else {
    split.on <- as.character(parameters.data.frame[['spliton']])
  }
  # Tokenize the string.
  tokens <- vector(length=0)
  for ( string in input.data.frame[, 1] ) {
    tokenized.string <- strsplit(string, split.on)
    for ( token in tokenized.string ) {
      tokens <- append(tokens, token)
    }
  }
  final.output <- data.frame(tokens)
  return(final.output)
}

LogTokenizerFactory <- function() {
  list(name    = LogTokenizer,
       udxtype = c("transform"),
       intype  = c("varchar"),
       outtype = c("varchar"),
       outtypecallback=LogTokenizerReturn,
       parametertypecallback=LogTokenizerParameters)
}


LogTokenizerParameters <- function() {
  parameters <- list(datatype = c("varchar"),
                     length   = c("NA"),
                     scale    = c("NA"),
                     name     = c("spliton"))
  return(parameters)
}

LogTokenizerReturn <- function(arg.data.frame, parm.data.frame) {
  output.return.type <- data.frame(datatype = rep(NA,1),
                                   length   = rep(NA,1),
                                   scale    = rep(NA,1),
                                   name     = rep(NA,1))
  output.return.type$datatype <- c("varchar")
  output.return.type$name <- c("Token")
  return(output.return.type)
}

9 - C++ 示例:多阶段索引器

以下代码片段来自随 Vertica SDK 一起分发的 InvertedIndex UDTF 示例。此代码片段演示了将 MultiPhaseTransformFunctionFactory 子类化以及添加两个 TransformFunctionPhase 子类(定义此 UDTF 中的两个阶段)。

class InvertedIndexFactory : public MultiPhaseTransformFunctionFactory
{
public:
   /**
    * Extracts terms from documents.
    */
   class ForwardIndexPhase : public TransformFunctionPhase
   {
       virtual void getReturnType(ServerInterface &srvInterface,
                                  const SizedColumnTypes &inputTypes,
                                  SizedColumnTypes &outputTypes)
       {
           // Sanity checks on input we've been given.
           // Expected input: (doc_id INTEGER, text VARCHAR)
           vector<size_t> argCols;
           inputTypes.getArgumentColumns(argCols);
           if (argCols.size() < 2 ||
               !inputTypes.getColumnType(argCols.at(0)).isInt() ||
               !inputTypes.getColumnType(argCols.at(1)).isVarchar())
               vt_report_error(0, "Function only accepts two arguments"
                                "(INTEGER, VARCHAR))");
           // Output of this phase is:
           //   (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)
           // Number of times term appears within a document.
           outputTypes.addInt("term_freq");
           // Add analytic clause columns: (PARTITION BY term ORDER BY doc_id).
           // The length of any term is at most the size of the entire document.
           outputTypes.addVarcharPartitionColumn(
                inputTypes.getColumnType(argCols.at(1)).getStringLength(),
                "term");
           // Add order column on the basis of the document id's data type.
           outputTypes.addOrderColumn(inputTypes.getColumnType(argCols.at(0)),
                                      "doc_id");
       }
       virtual TransformFunction *createTransformFunction(ServerInterface
                &srvInterface)
       { return vt_createFuncObj(srvInterface.allocator, ForwardIndexBuilder); }
   };
   /**
    * Constructs terms' posting lists.
    */
   class InvertedIndexPhase : public TransformFunctionPhase
   {
       virtual void getReturnType(ServerInterface &srvInterface,
                                  const SizedColumnTypes &inputTypes,
                                  SizedColumnTypes &outputTypes)
       {
           // Sanity checks on input we've been given.
           // Expected input:
           //   (term_freq INTEGER) OVER(PBY term VARCHAR OBY doc_id INTEGER)
           vector<size_t> argCols;
           inputTypes.getArgumentColumns(argCols);
           vector<size_t> pByCols;
           inputTypes.getPartitionByColumns(pByCols);
           vector<size_t> oByCols;
           inputTypes.getOrderByColumns(oByCols);
           if (argCols.size() != 1 || pByCols.size() != 1 || oByCols.size() != 1 ||
               !inputTypes.getColumnType(argCols.at(0)).isInt() ||
               !inputTypes.getColumnType(pByCols.at(0)).isVarchar() ||
               !inputTypes.getColumnType(oByCols.at(0)).isInt())
               vt_report_error(0, "Function expects an argument (INTEGER) with "
                               "analytic clause OVER(PBY VARCHAR OBY INTEGER)");
           // Output of this phase is:
           //   (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER).
           outputTypes.addVarchar(inputTypes.getColumnType(
                                    pByCols.at(0)).getStringLength(),"term");
           outputTypes.addInt("doc_id");
           // Number of times term appears within the document.
           outputTypes.addInt("term_freq");
           // Number of documents where the term appears in.
           outputTypes.addInt("corp_freq");
       }

       virtual TransformFunction *createTransformFunction(ServerInterface
                &srvInterface)
       { return vt_createFuncObj(srvInterface.allocator, InvertedIndexBuilder); }
   };
   ForwardIndexPhase fwardIdxPh;
   InvertedIndexPhase invIdxPh;
   virtual void getPhases(ServerInterface &srvInterface,
        std::vector<TransformFunctionPhase *> &phases)
   {
       fwardIdxPh.setPrepass(); // Process documents wherever they're originally stored.
       phases.push_back(&fwardIdxPh);
       phases.push_back(&invIdxPh);
   }
   virtual void getPrototype(ServerInterface &srvInterface,
                             ColumnTypes &argTypes,
                             ColumnTypes &returnType)
   {
       // Expected input: (doc_id INTEGER, text VARCHAR).
       argTypes.addInt();
       argTypes.addVarchar();
       // Output is: (term VARCHAR, doc_id INTEGER, term_freq INTEGER, corp_freq INTEGER)
       returnType.addVarchar();
       returnType.addInt();
       returnType.addInt();
       returnType.addInt();
   }
};
RegisterFactory(InvertedIndexFactory);

此示例中的大部分代码与 TransformFunctionFactory 类中的代码相似:

  • 这两个 TransformFunctionPhase 子类都实施 getReturnType() 函数,此函数描述每个阶段的输出。这与 TransformFunctionFactory 类中的 getReturnType() 函数相似。但是,此函数还可让您控制如何在多阶段 UDTF 的每个节点之间对数据进行分区和排序。

    第一个阶段调用 SizedColumnTypes::addVarcharPartitionColumn()(而非仅调用 addVarcharColumn()),以将此阶段的输出表设置为按包含提取的词的列进行分区。此阶段还调用 SizedColumnTypes::addOrderColumn(),以便按文档 ID 列对输出表进行排序。此阶段调用该函数而非特定于数据类型的函数(例如 addIntOrderColumn())之一,以便可以将原始列的数据类型传递到输出列。

  • MultiPhaseTransformFunctionFactory 类实施 getPrototype() 函数,此函数定义多阶段 UDTF 的输入和输出的架构。此函数与 TransformFunctionFactory::getPrototype() 函数相同。

MultiPhaseTransformFunctionFactory 类实施的唯一函数是 getPhases()。此函数可定义各个阶段的执行顺序。表示阶段的字段将以预期执行顺序推入该矢量。

您还可以在 MultiPhaseTransformFunctionFactory.getPhases() 函数中将 UDTF 的第一个阶段标记为对存储在节点本地的数据(而非分区到所有节点的数据)执行操作(称为“预通过”阶段)。使用此选项,您不必在 Vertica 群集中四处移动大量数据,从而提高多阶段 UDTF 的效率。

要将第一个阶段标记为预通过,您可以从 getPhase() 函数中调用第一个阶段的 TransformFunctionPhase 实例的 TransformFunctionPhase::setPrepass() 函数。

注意

  • 您需要确保每个阶段的输出架构与下一个阶段所需的输入架构匹配。在示例代码中,每个 TransformFunctionPhase::getReturnType() 实施将对其输入架构和输出架构执行健全性检查。TransformFunction 子类还可以在其 processPartition() 函数中执行这些检查。

  • 您的多阶段 UDTF 可以拥有的阶段数没有内置限制。但,阶段数越多,使用的资源就越多。在隔离模式下运行时,Vertica 可能会终止使用太多内存的 UDTF。请参阅C++ UDx 的资源使用情况

10 - Python 示例:多阶段计算

以下示例显示多阶段转换函数,该函数用于计算输入表中数字列的平均值。它会先定义两个转换函数,然后定义使用这些函数创建阶段的工厂。

有关完整的代码,请参阅示例分发中的 AvgMultiPhaseUDT.py

加载和使用示例

创建库和函数:

=> CREATE LIBRARY pylib_avg AS '/home/dbadmin/udx/AvgMultiPhaseUDT.py' LANGUAGE 'Python';
CREATE LIBRARY
=> CREATE TRANSFORM FUNCTION myAvg AS NAME 'MyAvgFactory' LIBRARY pylib_avg;
CREATE TRANSFORM FUNCTION

然后,您可以在 SELECT 语句中使用该函数:

=> CREATE TABLE IF NOT EXISTS numbers(num FLOAT);
CREATE TABLE

=> COPY numbers FROM STDIN delimiter ',';
1
2
3
4
\.

=> SELECT myAvg(num) OVER() FROM numbers;
 average | ignored_rows | total_rows
---------+--------------+------------
     2.5 |            0 |          4
(1 row)

设置

所有 Python UDx 都必须导入 Vertica SDK。此示例还会导入另一个库。

import vertica_sdk
import math

分量转换函数

多阶段转换函数必须定义两个或更多要在阶段中使用的 TransformFunction 子类。此示例会使用两个类:LocalCalculation(用于对本地分区进行计算)以及 GlobalCalculation(用于聚合所有 LocalCalculation 实例的结果以计算最终结果)。

在这两个函数中,计算都是在 processPartition() 函数中完成:

class LocalCalculation(vertica_sdk.TransformFunction):
    """
    This class is the first phase and calculates the local values for sum, ignored_rows and total_rows.
    """

    def setup(self, server_interface, col_types):
        server_interface.log("Setup: Phase0")
        self.local_sum = 0.0
        self.ignored_rows = 0
        self.total_rows = 0

    def processPartition(self, server_interface, input, output):
        server_interface.log("Process Partition: Phase0")

        while True:
            self.total_rows += 1

            if input.isNull(0) or math.isinf(input.getFloat(0)) or math.isnan(input.getFloat(0)):
                # Null, Inf, or Nan is ignored
                self.ignored_rows += 1
            else:
                self.local_sum += input.getFloat(0)

            if not input.next():
                break

        output.setFloat(0, self.local_sum)
        output.setInt(1, self.ignored_rows)
        output.setInt(2, self.total_rows)
        output.next()

class GlobalCalculation(vertica_sdk.TransformFunction):
    """
    This class is the second phase and aggregates the values for sum, ignored_rows and total_rows.
    """

    def setup(self, server_interface, col_types):
        server_interface.log("Setup: Phase1")
        self.global_sum = 0.0
        self.ignored_rows = 0
        self.total_rows = 0

    def processPartition(self, server_interface, input, output):
        server_interface.log("Process Partition: Phase1")

        while True:
            self.global_sum += input.getFloat(0)
            self.ignored_rows += input.getInt(1)
            self.total_rows += input.getInt(2)

            if not input.next():
                break

        average = self.global_sum / (self.total_rows - self.ignored_rows)

        output.setFloat(0, average)
        output.setInt(1, self.ignored_rows)
        output.setInt(2, self.total_rows)
        output.next()

多阶段工厂

MultiPhaseTransformFunctionFactory 会将各个函数作为阶段绑定在一起。工厂会为每个函数定义 TransformFunctionPhase。每个阶段都会定义 createTransformFunction(),用于调用对应的 TransformFunctiongetReturnType() 的构造函数。

下面是第一阶段 LocalPhase


class MyAvgFactory(vertica_sdk.MultiPhaseTransformFunctionFactory):
    """ Factory class """

    class LocalPhase(vertica_sdk.TransformFunctionPhase):
        """ Phase 1 """
        def getReturnType(self, server_interface, input_types, output_types):
            # sanity check
            number_of_cols = input_types.getColumnCount()
            if (number_of_cols != 1 or not input_types.getColumnType(0).isFloat()):
                raise ValueError("Function only accepts one argument (FLOAT))")

            output_types.addFloat("local_sum");
            output_types.addInt("ignored_rows");
            output_types.addInt("total_rows");

        def createTransformFunction(cls, server_interface):
            return LocalCalculation()

第二阶段 GlobalPhase 不会检查其输入,因为第一阶段已经进行了检查。与第一阶段一样,createTransformFunction 只是构造并返回对应的 TransformFunction


    class GlobalPhase(vertica_sdk.TransformFunctionPhase):
        """ Phase 2 """
        def getReturnType(self, server_interface, input_types, output_types):
            output_types.addFloat("average");
            output_types.addInt("ignored_rows");
            output_types.addInt("total_rows");

        def createTransformFunction(cls, server_interface):
            return GlobalCalculation()

在定义 TransformFunctionPhase 子类之后,工厂会将其实例化并在 getPhases() 中将其链接在一起。

    ph0Instance = LocalPhase()
    ph1Instance = GlobalPhase()

    def getPhases(cls, server_interface):
        cls.ph0Instance.setPrepass()
        phases = [cls.ph0Instance, cls.ph1Instance]
        return phases

11 - Python 示例:计数元素

以下示例会详细说明采用数组分区的 UDTF,计算分区中每个不同数组元素的计数,并将每个元素及其计数输出为行值。您可以对包含多个数组分区的表调用相应函数。

完整的源代码位于 /opt/vertica/sdk/examples/python/TransformFunctions.py 中。

加载和使用示例

加载库并创建转换函数,如下所示:

=> CREATE OR REPLACE LIBRARY TransformFunctions AS '/home/dbadmin/examples/python/TransformFunctions.py' LANGUAGE 'Python';

=> CREATE TRANSFORM FUNCTION CountElements AS LANGUAGE 'Python' NAME 'countElementsUDTFactory' LIBRARY TransformFunctions;

您可以创建一些数据,然后对其调用相应函数,例如:


=> CREATE TABLE orders (storeID int, productIDs array[int]);
CREATE TABLE

=> INSERT INTO orders VALUES
    (1, array[101, 102, 103]),
    (1, array[102, 104]),
    (1, array[101, 102, 102, 201, 203]),
    (2, array[101, 202, 203, 202, 203]),
    (2, array[203]),
    (2, array[]);
OUTPUT
--------
6
(1 row)

=> COMMIT;
COMMIT

=> SELECT storeID, CountElements(productIDs) OVER (PARTITION BY storeID) FROM orders;
storeID |       element_count
--------+---------------------------
      1 | {"element":101,"count":2}
      1 | {"element":102,"count":4}
      1 | {"element":103,"count":1}
      1 | {"element":104,"count":1}
      1 | {"element":201,"count":1}
      1 | {"element":202,"count":1}
      2 | {"element":101,"count":1}
      2 | {"element":202,"count":2}
      2 | {"element":203,"count":3}
(9 rows)

设置

所有 Python UDx 都必须导入 Vertica SDK 库:

import vertica_sdk

工厂实施

getPrototype() 方法会声明输入和输出可以是任何类型,这意味着必须在其他位置执行类型强制:


def getPrototype(self, srv_interface, arg_types, return_type):
    arg_types.addAny()
    return_type.addAny()

getReturnType() 将验证函数的唯一实参是否为数组,以及返回类型是否是具有“element”和“count”字段的行:


def getReturnType(self, srv_interface, arg_types, return_type):

    if arg_types.getColumnCount() != 1:
        srv_interface.reportError(1, 'countElements UDT should take exactly one argument')

    if not arg_types.getColumnType(0).isArrayType():
        srv_interface.reportError(2, 'Argument to countElements UDT should be an ARRAY')

    retRowFields = vertica_sdk.SizedColumnTypes.makeEmpty()
    retRowFields.addColumn(arg_types.getColumnType(0).getElementType(), 'element')
    retRowFields.addInt('count')
    return_type.addRowType(retRowFields, 'element_count')

countElementsUDTFactory 类还包含可实例化并返回转换函数的 createTransformFunction() 方法。

函数实施

使用名称分别为 arg_readerres_writerBlockReaderBlockWriter 来调用 processBlock() 方法。该函数会遍历分区中的所有输入数组,并使用字典来收集每个元素的频率。为了访问每个输入数组的元素,该方法会实例化 ArrayReader。收集元素计数后,该函数会将每个元素及其计数写入某个行中。对每个分区重复此过程。


def processPartition(self, srv_interface, arg_reader, res_writer):

    elemCounts = dict()
    # Collect element counts for entire partition
    while (True):
        if not arg_reader.isNull(0):
            arr = arg_reader.getArray(0)
            for elem in arr:
                elemCounts[elem] = elemCounts.setdefault(elem, 0) + 1

        if not arg_reader.next():
            break

    # Write at least one value for each partition
    if len(elemCounts) == 0:
        elemCounts[None] = 0

    # Write out element counts as (element, count) pairs
    for pair in elemCounts.items():
        res_writer.setRow(0, pair)
        res_writer.next()