SourceFactory 类

如果编写源,您还必须编写源工厂。SourceFactory 类的子类负责执行下列任务:

  • 对传递到 UDSource 的参数执行初始验证。

  • 设置 UDSource 实例执行其工作所需的任何数据结构。此信息可能包括有关哪些节点将读取哪个数据源的记录。

  • 为函数在每个主机上读取的每个数据源(或其一部分)创建 UDSource 子类的一个实例。

最简单的源工厂将为每个执行程序节点的每个数据源创建一个 UDSource 实例。您还可以在每个节点上使用多个并发 UDSource 实例。此行为称为并发加载。为了支持这两个选项,SourceFactory 拥有可创建源的方法的两个版本。您必须准确实施其中一个版本。

源工厂是单例。您的子类必须为无状态子类,没有包含数据的字段。该子类还不得修改任何全局变量。

SourceFactory 方法

SourceFactory 类定义了多个方法。您的类必须覆盖 prepareUDSources();它可以覆盖其他方法。

设置

Vertica 将在启动程序节点上调用 plan() 一次,以执行下列任务:

  • 检查用户已向 COPY 语句中的函数调用提供的参数,并提供错误消息(如果出现任何问题)。您可以通过从传递到 plan() 方法的 ServerInterface 的实例获取 ParamReader 对象来读取参数。

  • 确定群集中的哪些主机将读取该数据源。如何拆分工作取决于函数将读取的源。某些源可以跨多个主机进行拆分,例如从多个 URL 读取数据的源。其他源(例如主机文件系统上的单个本地文件)只能由单个特定主机读取。

    可以通过对 NodeSpecifyingPlanContext 对象调用 setTargetNodes() 方法来存储要读取数据源的主机的列表。此对象会传递到 plan() 方法中。

  • 存储各个主机所需的任何信息,以便处理传递到 plan() 方法的 NodeSpecifyingPlanContext 实例中的数据源。例如,您可以存储分配,用于告知每个主机要处理的数据源。plan() 方法仅在启动程序节点上运行,而 prepareUDSources() 方法则在每个读取数据源的主机上运行。因此,该对象是它们之间的唯一通信方式。

    您通过从 getWriter() 方法中获取 ParamWriter 对象在 NodeSpecifyingPlanContext 中存储数据。然后,通过对 ParamWriter 调用方法(比如 setString())写入参数。

创建源

Vertica 会在 plan() 方法已选择将数据加载到的所有主机上调用 prepareUDSources()。此调用会实例化并返回 UDSource 子类实例的列表。如果不使用并发加载,请为分配给主机进行处理的每个源返回一个 UDSource。如果要使用并发加载,请使用将 ExecutorPlanContext 作为参数的方法版本,并尽可能多地返回可供使用的源。您的工厂必须准确实施这些方法之一。

对于并发加载,您可以通过在传入的 ExecutorPlanContext 上调用 getLoadConcurrency() 来了解节点上可供运行 UDSource 实例的线程数。

定义参数

实施 getParameterTypes() 可定义源所使用的参数的名称和类型。Vertica 使用此信息对调用人员发出有关未知或缺失参数的警告。Vertica 会忽略未知参数并为缺失参数使用默认值。当您需要为函数定义类型和参数时,您不需要覆盖此方法。

请求并发加载的线程

当源工厂在执行程序节点上创建源时,默认情况下,它会为每个源创建一个线程。如果您的源可以使用多个线程,请实施 getDesiredThreads()。在调用 prepareUDSources() 之前,Vertica 会先调用此方法,因此您也可以使用它来决定要创建的源数量。返回工厂可用于源的线程数。已传入可用线程的最大数量,因此您可以将其考虑在内。方法返回的值只是一种提示,而不是保证;每个执行程序节点均可确定要分配的线程数。FilePortionSourceFactory 示例将实施此方法;请参阅 C++ 示例:并发加载

您可以允许源控制并行度,这意味着它可以通过实施 isSourceApportionable() 将单个输入拆分成多个加载流。即使此方法返回 true,也不保证源分摊该加载。然而,返回 false 表示解析器不会尝试这么做。有关详细信息,请参阅分摊加载

通常,实施 getDesiredThreads()SourceFactory 也使用分摊加载。但是,使用分摊加载不是必需的。例如,从 Kafka 流中读取的源可以使用多个线程而无需分摊。

API

SourceFactory API 提供了以下通过子类扩展的方法:

virtual void plan(ServerInterface &srvInterface, NodeSpecifyingPlanContext &planCtxt);

// must implement exactly one of prepareUDSources() or prepareUDSourcesExecutor()
virtual std::vector< UDSource * > prepareUDSources(ServerInterface &srvInterface,
            NodeSpecifyingPlanContext &planCtxt);

virtual std::vector< UDSource * > prepareUDSourcesExecutor(ServerInterface &srvInterface,
            ExecutorPlanContext &planCtxt);

virtual void getParameterType(ServerInterface &srvInterface,
            SizedColumnTypes &parameterTypes);

virtual bool isSourceApportionable();

ssize_t getDesiredThreads(ServerInterface &srvInterface,
            ExecutorPlanContext &planContext);

创建 SourceFactory 之后,您必须将其注册到 RegisterFactory 宏。

SourceFactory API 提供了以下通过子类扩展的方法:

public void plan(ServerInterface srvInterface, NodeSpecifyingPlanContext planCtxt)
    throws UdfException;

// must implement one overload of prepareUDSources()
public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface,
                NodeSpecifyingPlanContext planCtxt)
    throws UdfException;

public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface,
                ExecutorPlanContext planCtxt)
    throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

public boolean isSourceApportionable();

public int getDesiredThreads(ServerInterface srvInterface,
                ExecutorPlanContext planCtxt)
    throws UdfException;