UDSource 类

如果需要从 COPY 已不支持的源类型加载数据,您可以将 UDSource 类子类化。

UDSource 子类的每个实例从单个数据源读取数据。例如,单个数据源可以是单个文件或对 RESTful Web 应用程序执行的单个函数调用的结果。

UDSource 方法

您的 UDSource 子类必须覆盖 process()processWithMetadata()

  • process() 将原始输入流作为一个大文件读取。如果有任何错误或故障,整个加载将失败。

  • processWithMetadata() 当数据源具有以某种结构化格式(与数据有效负载分开)提供的关于记录边界的元数据时很有用。使用此接口,源除了发出数据之外,还会为每个记录发出记录长度。

    通过在每个阶段实施 processWithMetadata() 而不是 process(),您可以在整个加载堆栈中保留此记录长度元数据,从而实施更有效的解析,以在每个消息的基础上而不是每个文件或每个源的基础上从错误中恢复。 KafkaSource 和 Kafka 解析器(KafkaAvroParserKafkaJSONParserKafkaParser)会在各条 Kafka 消息无法解析时使用这种机制为拒绝每条 Kafka 消息提供支持。

此外,还可以覆盖其他 UDSource 类方法。

源执行

以下各节详细说明了每次调用用户定义的源时的执行序列。以下示例覆盖 process() 方法。

设置
COPY 在第一次调用 process() 之前调用了 setup()。使用 setup() 可执行访问数据源所需的任何设置步骤。此方法可以建立网络连接和打开文件,还可以执行类似的必需任务以使 UDSource 能够从数据源读取数据。您的对象可能在使用期间已损坏并重新创建,因此请确保您的对象可以重新启动。

处理源
COPY 将在查询执行期间重复调用 process(),以读取数据并将数据写入到作为参数传递的 DataBuffer。然后,此缓冲区会传递到第一个过滤器。

如果源已用完输入或已填满输出缓冲区,则它必须返回值 StreamState.OUTPUT_NEEDED。如果 Vertica 收到此返回值,它将再次调用该方法。此第二次调用在数据加载过程的下一个阶段已处理输出缓冲区之后进行。如果返回 StreamState.DONE,则指示已读取源中的所有数据。

用户可以取消加载操作,这样会中止读取。

分解
COPY 将在最后一次调用 process() 之后调用 destroy()。此方法可释放由 setup()process() 方法预留的任何资源,例如,setup() 方法分配的文件句柄或网络连接。

取值函数

一个源可以定义两个取值函数,即 getSize()getUri()

调用 process() 之前,COPY 可能会调用 getSize() 以估算要读取的数据的字节数。此值只是一个估算值,用于在 LOAD_STREAMS 表中指示文件大小。由于 Vertica 会在调用 setup() 之前调用该方法,因此 getSize() 不得依赖由 setup() 分配的任何资源。

此方法不应使任何资源处于打开状态。例如,请勿保存由 getSize() 打开以供 process() 方法使用的任何文件句柄,否则会耗尽可用资源,因为 Vertica 将在加载任何数据之前会对 UDSource 子类的所有实例调用 getSize()。如果正在打开许多数据源,这些打开的文件句柄可能会用完系统提供的文件句柄,从而导致没有任何其他文件句柄可用于执行实际的数据加载。

Vertica 将在执行期间调用 getUri(),以更新有关当前正在加载哪些资源的状态信息。它会返回由此 UDSource 读取的数据源的 URI。

API

UDSource API 提供了以下通过子类扩展的方法:

virtual void setup(ServerInterface &srvInterface);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &output, LengthBuffer &output_lengths)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface);

virtual vint getSize();

virtual std::string getUri();

UDSource API 提供了以下通过子类扩展的方法:

public void setup(ServerInterface srvInterface) throws UdfException;

public abstract StreamState process(ServerInterface srvInterface, DataBuffer output) throws UdfException;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface) throws UdfException;

public Integer getSize();

public String getUri();