SourceFactory 类
如果编写源,您还必须编写源工厂。SourceFactory
类的子类负责执行下列任务:
-
对传递到
UDSource
的参数执行初始验证。 -
设置
UDSource
实例执行其工作所需的任何数据结构。此信息可能包括有关哪些节点将读取哪个数据源的记录。 -
为函数在每个主机上读取的每个数据源(或其一部分)创建
UDSource
子类的一个实例。
最简单的源工厂将为每个执行程序节点的每个数据源创建一个 UDSource
实例。您还可以在每个节点上使用多个并发 UDSource
实例。此行为称为并发加载。为了支持这两个选项,SourceFactory
拥有可创建源的方法的两个版本。您必须准确实施其中一个版本。
源工厂是单例。您的子类必须为无状态子类,没有包含数据的字段。该子类还不得修改任何全局变量。
SourceFactory 方法
SourceFactory
类定义了多个方法。您的类必须覆盖 prepareUDSources()
;它可以覆盖其他方法。
设置
Vertica 将在启动程序节点上调用 plan()
一次,以执行下列任务:
-
检查用户已向 COPY 语句中的函数调用提供的参数,并提供错误消息(如果出现任何问题)。您可以通过从传递到
plan()
方法的ServerInterface
的实例获取ParamReader
对象来读取参数。 -
确定群集中的哪些主机将读取该数据源。如何拆分工作取决于函数将读取的源。某些源可以跨多个主机进行拆分,例如从多个 URL 读取数据的源。其他源(例如主机文件系统上的单个本地文件)只能由单个特定主机读取。
可以通过对
NodeSpecifyingPlanContext
对象调用setTargetNodes()
方法来存储要读取数据源的主机的列表。此对象会传递到plan()
方法中。 -
存储各个主机所需的任何信息,以便处理传递到
plan()
方法的NodeSpecifyingPlanContext
实例中的数据源。例如,您可以存储分配,用于告知每个主机要处理的数据源。plan()
方法仅在启动程序节点上运行,而prepareUDSources()
方法则在每个读取数据源的主机上运行。因此,该对象是它们之间的唯一通信方式。您通过从
getWriter()
方法中获取ParamWriter
对象在NodeSpecifyingPlanContext
中存储数据。然后,通过对ParamWriter
调用方法(比如setString()
)写入参数。注意
ParamWriter
仅提供存储简单数据类型的能力。对于复杂的类型,您必须以某种方式将数据序列化并将其作为字符串或长字符串存储。
创建源
Vertica 会在 plan()
方法已选择将数据加载到的所有主机上调用 prepareUDSources()
。此调用会实例化并返回 UDSource
子类实例的列表。如果不使用并发加载,请为分配给主机进行处理的每个源返回一个 UDSource
。如果要使用并发加载,请使用将 ExecutorPlanContext
作为参数的方法版本,并尽可能多地返回可供使用的源。您的工厂必须准确实施这些方法之一。
注意
在 C++ API 中,支持并发加载的函数将被命名为prepareUDSourcesExecutor()
。在 Java API 中,该类提供了 prepareUDSources()
的两个过载。
对于并发加载,您可以通过在传入的 ExecutorPlanContext
上调用 getLoadConcurrency()
来了解节点上可供运行 UDSource
实例的线程数。
定义参数
实施 getParameterTypes()
可定义源所使用的参数的名称和类型。Vertica 使用此信息对调用人员发出有关未知或缺失参数的警告。Vertica 会忽略未知参数并为缺失参数使用默认值。当您需要为函数定义类型和参数时,您不需要覆盖此方法。
请求并发加载的线程
当源工厂在执行程序节点上创建源时,默认情况下,它会为每个源创建一个线程。如果您的源可以使用多个线程,请实施 getDesiredThreads()
。在调用 prepareUDSources()
之前,Vertica 会先调用此方法,因此您也可以使用它来决定要创建的源数量。返回工厂可用于源的线程数。已传入可用线程的最大数量,因此您可以将其考虑在内。方法返回的值只是一种提示,而不是保证;每个执行程序节点均可确定要分配的线程数。FilePortionSourceFactory
示例将实施此方法;请参阅 C++ 示例:并发加载。
您可以允许源控制并行度,这意味着它可以通过实施 isSourceApportionable()
将单个输入拆分成多个加载流。即使此方法返回 true
,也不保证源会分摊该加载。然而,返回 false
表示解析器不会尝试这么做。有关详细信息,请参阅分摊加载。
通常,实施 getDesiredThreads()
的 SourceFactory
也使用分摊加载。但是,使用分摊加载不是必需的。例如,从 Kafka 流中读取的源可以使用多个线程而无需分摊。
API
SourceFactory API 提供了以下通过子类扩展的方法:
virtual void plan(ServerInterface &srvInterface, NodeSpecifyingPlanContext &planCtxt);
// must implement exactly one of prepareUDSources() or prepareUDSourcesExecutor()
virtual std::vector< UDSource * > prepareUDSources(ServerInterface &srvInterface,
NodeSpecifyingPlanContext &planCtxt);
virtual std::vector< UDSource * > prepareUDSourcesExecutor(ServerInterface &srvInterface,
ExecutorPlanContext &planCtxt);
virtual void getParameterType(ServerInterface &srvInterface,
SizedColumnTypes ¶meterTypes);
virtual bool isSourceApportionable();
ssize_t getDesiredThreads(ServerInterface &srvInterface,
ExecutorPlanContext &planContext);
创建 SourceFactory
之后,您必须将其注册到 RegisterFactory
宏。
SourceFactory API 提供了以下通过子类扩展的方法:
public void plan(ServerInterface srvInterface, NodeSpecifyingPlanContext planCtxt)
throws UdfException;
// must implement one overload of prepareUDSources()
public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface,
NodeSpecifyingPlanContext planCtxt)
throws UdfException;
public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface,
ExecutorPlanContext planCtxt)
throws UdfException;
public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);
public boolean isSourceApportionable();
public int getDesiredThreads(ServerInterface srvInterface,
ExecutorPlanContext planCtxt)
throws UdfException;