使用用户定义的源,您可以使用未内置在 Vertica 中的方法来处理数据源。例如,您可以编写用户定义的源,以使用 cURL 访问来自 HTTP 源的数据。虽然给定的 COPY 语句只能指定一个用户定义的源语句,但源函数本身可以从多个源中拉取数据。
UDSource
类将从外部源获取数据。此类可从输入流读取数据,并生成输出流以进行过滤和解析。如果实施 UDSource
,则还必须实施相应的
SourceFactory
。
使用用户定义的源,您可以使用未内置在 Vertica 中的方法来处理数据源。例如,您可以编写用户定义的源,以使用 cURL 访问来自 HTTP 源的数据。虽然给定的 COPY 语句只能指定一个用户定义的源语句,但源函数本身可以从多个源中拉取数据。
UDSource
类将从外部源获取数据。此类可从输入流读取数据,并生成输出流以进行过滤和解析。如果实施 UDSource
,则还必须实施相应的
SourceFactory
。
如果需要从 COPY 已不支持的源类型加载数据,您可以将 UDSource
类子类化。
UDSource
子类的每个实例从单个数据源读取数据。例如,单个数据源可以是单个文件或对 RESTful Web 应用程序执行的单个函数调用的结果。
您的 UDSource
子类必须覆盖 process()
或 processWithMetadata()
:
processWithMetadata()
仅适用于以 C++ 编程语言编写的用户定义扩展 (UDx)。
process()
将原始输入流作为一个大文件读取。如果有任何错误或故障,整个加载将失败。
processWithMetadata()
当数据源具有以某种结构化格式(与数据有效负载分开)提供的关于记录边界的元数据时很有用。使用此接口,源除了发出数据之外,还会为每个记录发出记录长度。
通过在每个阶段实施 processWithMetadata()
而不是 process()
,您可以在整个加载堆栈中保留此记录长度元数据,从而实施更有效的解析,以在每个消息的基础上而不是每个文件或每个源的基础上从错误中恢复。 KafkaSource 和 Kafka 解析器(KafkaAvroParser、KafkaJSONParser 和 KafkaParser)会在各条 Kafka 消息无法解析时使用这种机制为拒绝每条 Kafka 消息提供支持。
processWithMetadata()
,您必须覆盖 useSideChannel()
以返回 true
。
此外,还可以覆盖其他 UDSource
类方法。
以下各节详细说明了每次调用用户定义的源时的执行序列。以下示例覆盖 process()
方法。
设置
COPY 在第一次调用 process()
之前调用了 setup()
。使用 setup()
可执行访问数据源所需的任何设置步骤。此方法可以建立网络连接和打开文件,还可以执行类似的必需任务以使 UDSource
能够从数据源读取数据。您的对象可能在使用期间已损坏并重新创建,因此请确保您的对象可以重新启动。
处理源
COPY 将在查询执行期间重复调用 process()
,以读取数据并将数据写入到作为参数传递的 DataBuffer
。然后,此缓冲区会传递到第一个过滤器。
如果源已用完输入或已填满输出缓冲区,则它必须返回值 StreamState.OUTPUT_NEEDED
。如果 Vertica 收到此返回值,它将再次调用该方法。此第二次调用在数据加载过程的下一个阶段已处理输出缓冲区之后进行。如果返回 StreamState.DONE
,则指示已读取源中的所有数据。
用户可以取消加载操作,这样会中止读取。
分解
COPY 将在最后一次调用 process()
之后调用 destroy()
。此方法可释放由 setup()
或 process()
方法预留的任何资源,例如,setup()
方法分配的文件句柄或网络连接。
一个源可以定义两个取值函数,即 getSize()
和 getUri()
。
调用 process()
之前,COPY 可能会调用 getSize()
以估算要读取的数据的字节数。此值只是一个估算值,用于在 LOAD_STREAMS 表中指示文件大小。由于 Vertica 会在调用 setup()
之前调用该方法,因此 getSize()
不得依赖由 setup()
分配的任何资源。
此方法不应使任何资源处于打开状态。例如,请勿保存由 getSize()
打开以供 process()
方法使用的任何文件句柄,否则会耗尽可用资源,因为 Vertica 将在加载任何数据之前会对 UDSource
子类的所有实例调用 getSize()
。如果正在打开许多数据源,这些打开的文件句柄可能会用完系统提供的文件句柄,从而导致没有任何其他文件句柄可用于执行实际的数据加载。
Vertica 将在执行期间调用 getUri()
,以更新有关当前正在加载哪些资源的状态信息。它会返回由此 UDSource
读取的数据源的 URI。
UDSource API 提供了以下通过子类扩展的方法:
virtual void setup(ServerInterface &srvInterface);
virtual bool useSideChannel();
virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output)=0;
virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &output, LengthBuffer &output_lengths)=0;
virtual void cancel(ServerInterface &srvInterface);
virtual void destroy(ServerInterface &srvInterface);
virtual vint getSize();
virtual std::string getUri();
UDSource API 提供了以下通过子类扩展的方法:
public void setup(ServerInterface srvInterface) throws UdfException;
public abstract StreamState process(ServerInterface srvInterface, DataBuffer output) throws UdfException;
protected void cancel(ServerInterface srvInterface);
public void destroy(ServerInterface srvInterface) throws UdfException;
public Integer getSize();
public String getUri();
如果编写源,您还必须编写源工厂。SourceFactory
类的子类负责执行下列任务:
对传递到 UDSource
的参数执行初始验证。
设置 UDSource
实例执行其工作所需的任何数据结构。此信息可能包括有关哪些节点将读取哪个数据源的记录。
为函数在每个主机上读取的每个数据源(或其一部分)创建 UDSource
子类的一个实例。
最简单的源工厂将为每个执行程序节点的每个数据源创建一个 UDSource
实例。您还可以在每个节点上使用多个并发 UDSource
实例。此行为称为并发加载。为了支持这两个选项,SourceFactory
拥有可创建源的方法的两个版本。您必须准确实施其中一个版本。
源工厂是单例。您的子类必须为无状态子类,没有包含数据的字段。该子类还不得修改任何全局变量。
SourceFactory
类定义了多个方法。您的类必须覆盖 prepareUDSources()
;它可以覆盖其他方法。
Vertica 将在启动程序节点上调用 plan()
一次,以执行下列任务:
检查用户已向 COPY 语句中的函数调用提供的参数,并提供错误消息(如果出现任何问题)。您可以通过从传递到 plan()
方法的 ServerInterface
的实例获取 ParamReader
对象来读取参数。
确定群集中的哪些主机将读取该数据源。如何拆分工作取决于函数将读取的源。某些源可以跨多个主机进行拆分,例如从多个 URL 读取数据的源。其他源(例如主机文件系统上的单个本地文件)只能由单个特定主机读取。
可以通过对 NodeSpecifyingPlanContext
对象调用 setTargetNodes()
方法来存储要读取数据源的主机的列表。此对象会传递到 plan()
方法中。
存储各个主机所需的任何信息,以便处理传递到 plan()
方法的 NodeSpecifyingPlanContext
实例中的数据源。例如,您可以存储分配,用于告知每个主机要处理的数据源。plan()
方法仅在启动程序节点上运行,而 prepareUDSources()
方法则在每个读取数据源的主机上运行。因此,该对象是它们之间的唯一通信方式。
您通过从 getWriter()
方法中获取 ParamWriter
对象在 NodeSpecifyingPlanContext
中存储数据。然后,通过对 ParamWriter
调用方法(比如 setString()
)写入参数。
ParamWriter
仅提供存储简单数据类型的能力。对于复杂的类型,您必须以某种方式将数据序列化并将其作为字符串或长字符串存储。
Vertica 会在 plan()
方法已选择将数据加载到的所有主机上调用 prepareUDSources()
。此调用会实例化并返回 UDSource
子类实例的列表。如果不使用并发加载,请为分配给主机进行处理的每个源返回一个 UDSource
。如果要使用并发加载,请使用将 ExecutorPlanContext
作为参数的方法版本,并尽可能多地返回可供使用的源。您的工厂必须准确实施这些方法之一。
prepareUDSourcesExecutor()
。在 Java API 中,该类提供了 prepareUDSources()
的两个过载。
对于并发加载,您可以通过在传入的 ExecutorPlanContext
上调用 getLoadConcurrency()
来了解节点上可供运行 UDSource
实例的线程数。
实施 getParameterTypes()
可定义源所使用的参数的名称和类型。Vertica 使用此信息对调用人员发出有关未知或缺失参数的警告。Vertica 会忽略未知参数并为缺失参数使用默认值。当您需要为函数定义类型和参数时,您不需要覆盖此方法。
当源工厂在执行程序节点上创建源时,默认情况下,它会为每个源创建一个线程。如果您的源可以使用多个线程,请实施 getDesiredThreads()
。在调用 prepareUDSources()
之前,Vertica 会先调用此方法,因此您也可以使用它来决定要创建的源数量。返回工厂可用于源的线程数。已传入可用线程的最大数量,因此您可以将其考虑在内。方法返回的值只是一种提示,而不是保证;每个执行程序节点均可确定要分配的线程数。FilePortionSourceFactory
示例将实施此方法;请参阅 C++ 示例:并发加载。
您可以允许源控制并行度,这意味着它可以通过实施 isSourceApportionable()
将单个输入拆分成多个加载流。即使此方法返回 true
,也不保证源会分摊该加载。然而,返回 false
表示解析器不会尝试这么做。有关详细信息,请参阅分摊加载。
通常,实施 getDesiredThreads()
的 SourceFactory
也使用分摊加载。但是,使用分摊加载不是必需的。例如,从 Kafka 流中读取的源可以使用多个线程而无需分摊。
SourceFactory API 提供了以下通过子类扩展的方法:
virtual void plan(ServerInterface &srvInterface, NodeSpecifyingPlanContext &planCtxt);
// must implement exactly one of prepareUDSources() or prepareUDSourcesExecutor()
virtual std::vector< UDSource * > prepareUDSources(ServerInterface &srvInterface,
NodeSpecifyingPlanContext &planCtxt);
virtual std::vector< UDSource * > prepareUDSourcesExecutor(ServerInterface &srvInterface,
ExecutorPlanContext &planCtxt);
virtual void getParameterType(ServerInterface &srvInterface,
SizedColumnTypes ¶meterTypes);
virtual bool isSourceApportionable();
ssize_t getDesiredThreads(ServerInterface &srvInterface,
ExecutorPlanContext &planContext);
创建 SourceFactory
之后,您必须将其注册到 RegisterFactory
宏。
SourceFactory API 提供了以下通过子类扩展的方法:
public void plan(ServerInterface srvInterface, NodeSpecifyingPlanContext planCtxt)
throws UdfException;
// must implement one overload of prepareUDSources()
public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface,
NodeSpecifyingPlanContext planCtxt)
throws UdfException;
public ArrayList< UDSource > prepareUDSources(ServerInterface srvInterface,
ExecutorPlanContext planCtxt)
throws UdfException;
public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);
public boolean isSourceApportionable();
public int getDesiredThreads(ServerInterface srvInterface,
ExecutorPlanContext planCtxt)
throws UdfException;
使用 CurlSource 示例,您可以使用 cURL 通过 HTTP 打开和读入文件。该示例作为以下内容的一部分提供:/opt/vertica/sdk/examples/SourceFunctions/cURL.cpp
。
此示例使用位于 /opt/vertica/sdk/examples/HelperLibraries/
中的 helper 库。
CurlSource 按区块加载数据。如果解析器遇到 EndOfFile 标记,则 process()
方法将返回 DONE。否则,该方法将返回 OUTPUT_NEEDED 并处理其他数据区块。helper 库中包含的函数(例如 url_fread()
和 url_fopen()
)基于随 libcurl 库附带提供的示例。有关示例,请访问 http://curl.haxx.se/libcurl/c/fopen.html。
setup()
函数可打开文件句柄,而 destroy()
函数可关闭文件句柄。它们都使用 helper 库中的函数。
class CurlSource : public UDSource {private:
URL_FILE *handle;
std::string url;
virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output) {
output.offset = url_fread(output.buf, 1, output.size, handle);
return url_feof(handle) ? DONE : OUTPUT_NEEDED;
}
public:
CurlSource(std::string url) : url(url) {}
void setup(ServerInterface &srvInterface) {
handle = url_fopen(url.c_str(),"r");
}
void destroy(ServerInterface &srvInterface) {
url_fclose(handle);
}
};
CurlSourceFactory
可生成 CurlSource
实例。
class CurlSourceFactory : public SourceFactory {public:
virtual void plan(ServerInterface &srvInterface,
NodeSpecifyingPlanContext &planCtxt) {
std::vector<std::string> args = srvInterface.getParamReader().getParamNames();
/* Check parameters */
if (args.size() != 1 || find(args.begin(), args.end(), "url") == args.end()) {
vt_report_error(0, "You must provide a single URL.");
}
/* Populate planData */
planCtxt.getWriter().getStringRef("url").copy(
srvInterface.getParamReader().getStringRef("url"));
/* Assign Nodes */
std::vector<std::string> executionNodes = planCtxt.getClusterNodes();
while (executionNodes.size() > 1) executionNodes.pop_back();
// Only run on the first node in the list.
planCtxt.setTargetNodes(executionNodes);
}
virtual std::vector<UDSource*> prepareUDSources(ServerInterface &srvInterface,
NodeSpecifyingPlanContext &planCtxt) {
std::vector<UDSource*> retVal;
retVal.push_back(vt_createFuncObj(srvInterface.allocator, CurlSource,
planCtxt.getReader().getStringRef("url").str()));
return retVal;
}
virtual void getParameterType(ServerInterface &srvInterface,
SizedColumnTypes ¶meterTypes) {
parameterTypes.addVarchar(65000, "url");
}
};
RegisterFactory(CurlSourceFactory);
FilePortionSource
示例演示了如何使用并发加载。此示例是对 FileSource
示例进行的改进。每个输入文件都会拆分成多个部分并分发到 FilePortionSource
实例。源接受将输入分成多个部分所依据的偏移量列表;如果未提供偏移量,源将动态拆分输入。
并发加载将在工厂中进行处理,所以本文讨论的重点是 FilePortionSourceFactory
。该示例的完整代码位于 /opt/vertica/sdk/examples/ApportionLoadFunctions
中。该分发还包括此示例的 Java 版本。
按如下所示加载并使用 FilePortionSource
示例。
=> CREATE LIBRARY FilePortionLib AS '/home/dbadmin/FP.so';
=> CREATE SOURCE FilePortionSource AS LANGUAGE 'C++'
-> NAME 'FilePortionSourceFactory' LIBRARY FilePortionLib;
=> COPY t WITH SOURCE FilePortionSource(file='g1/*.dat', nodes='initiator,e0,e1', offsets = '0,380000,820000');
=> COPY t WITH SOURCE FilePortionSource(file='g2/*.dat', nodes='e0,e1,e2', local_min_portion_size = 2097152);
并发加载会在两个位置影响源工厂:getDesiredThreads()
和 prepareUDSourcesExecutor()
。
getDesiredThreads()
成员函数可确定要请求的线程数。Vertica 会在调用 prepareUDSourcesExecutor()
之前在每个执行程序节点上调用此成员函数。
该函数会先开始将输入文件路径(可能为 glob)分成多个单独的路径。本讨论省略了这些细节。如果未使用分摊加载,则该函数为每个文件分配一个源。
virtual ssize_t getDesiredThreads(ServerInterface &srvInterface,
ExecutorPlanContext &planCtxt) {
const std::string filename = srvInterface.getParamReader().getStringRef("file").str();
std::vector<std::string> paths;
// expand the glob - at least one thread per source.
...
// figure out how to assign files to sources
const std::string nodeName = srvInterface.getCurrentNodeName();
const size_t nodeId = planCtxt.getWriter().getIntRef(nodeName);
const size_t numNodes = planCtxt.getTargetNodes().size();
if (!planCtxt.canApportionSource()) {
/* no apportioning, so the number of files is the final number of sources */
std::vector<std::string> *expanded =
vt_createFuncObject<std::vector<std::string> >(srvInterface.allocator, paths);
/* save expanded paths so we don't have to compute expansion again */
planCtxt.getWriter().setPointer("expanded", expanded);
return expanded->size();
}
// ...
如果可以分摊源,则 getDesiredThreads()
会使用作为实参传递给工厂的偏移量,以将文件拆分成多个部分。然后,它会将这些部分分配给可用节点。此函数实际上不会直接分配源;完成这项工作是为了确定要请求的线程数。
else if (srvInterface.getParamReader().containsParameter("offsets")) {
// if the offsets are specified, then we will have a fixed number of portions per file.
// Round-robin assign offsets to nodes.
// ...
/* Construct the portions that this node will actually handle.
* This isn't changing (since the offset assignments are fixed),
* so we'll build the Portion objects now and make them available
* to prepareUDSourcesExecutor() by putting them in the ExecutorContext.
*
* We don't know the size of the last portion, since it depends on the file
* size. Rather than figure it out here we will indicate it with -1 and
* defer that to prepareUDSourcesExecutor().
*/
std::vector<Portion> *portions =
vt_createFuncObject<std::vector<Portion>>(srvInterface.allocator);
for (std::vector<size_t>::const_iterator offset = offsets.begin();
offset != offsets.end(); ++offset) {
Portion p(*offset);
p.is_first_portion = (offset == offsets.begin());
p.size = (offset + 1 == offsets.end() ? -1 : (*(offset + 1) - *offset));
if ((offset - offsets.begin()) % numNodes == nodeId) {
portions->push_back(p);
srvInterface.log("FilePortionSource: assigning portion %ld: [offset = %lld, size = %lld]",
offset - offsets.begin(), p.offset, p.size);
}
}
该函数现在具有所有部分,因此可以知道部分的数量:
planCtxt.getWriter().setPointer("portions", portions);
/* total number of threads we want is the number of portions per file, which is fixed */
return portions->size() * expanded->size();
} // end of "offsets" parameter
如果未提供偏移量,则该函数会将文件动态拆分成多个部分,每个线程一个部分。本讨论省略了此计算过程的细节。请求的线程数多于可用线程数没有意义,因此该函数会对用 PlanContext
(函数的实参)调用 getMaxAllowedThreads()
来设置上限:
if (portions->size() >= planCtxt.getMaxAllowedThreads()) {
return paths.size();
}
有关此函数如何将文件拆分成多个部分的详细信息,请参阅完整示例。
此函数使用 vt_createFuncObject
模板来创建对象。Vertica 会调用使用此宏创建的返回对象的析构函数,但它不会调用其他对象(如向量)的析构函数。您必须自己调用这些析构函数以避免内存泄漏。在此示例中,这些调用是在 prepareUDSourcesExecutor()
中进行的。
prepareUDSourcesExecutor()
成员函数(如 getDesiredThreads()
)包含单独的代码块,具体取决于是否提供了偏移量。在这两种情况下,该函数都会将输入拆分成多个部分并为它们创建 UDSource
实例。
如果使用偏移量调用函数,则 prepareUDSourcesExecutor()
会调用 prepareCustomizedPortions()
。此函数如下所示。
/* prepare portions as determined via the "offsets" parameter */
void prepareCustomizedPortions(ServerInterface &srvInterface,
ExecutorPlanContext &planCtxt,
std::vector<UDSource *> &sources,
const std::vector<std::string> &expandedPaths,
std::vector<Portion> &portions) {
for (std::vector<std::string>::const_iterator filename = expandedPaths.begin();
filename != expandedPaths.end(); ++filename) {
/*
* the "portions" vector contains the portions which were generated in
* "getDesiredThreads"
*/
const size_t fileSize = getFileSize(*filename);
for (std::vector<Portion>::const_iterator portion = portions.begin();
portion != portions.end(); ++portion) {
Portion fportion(*portion);
if (fportion.size == -1) {
/* as described above, this means from the offset to the end */
fportion.size = fileSize - portion->offset;
sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
*filename, fportion));
} else if (fportion.size > 0) {
sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
*filename, fportion));
}
}
}
}
如果未使用偏移量调用 prepareUDSourcesExecutor()
,则它必须决定要创建的部分数。
基本规则是每个源使用一个部分。但是,如果有额外的线程可用,该函数会将输入拆分成更多部分,以便源可以同时处理它们。然后,prepareUDSourcesExecutor()
会调用 prepareGeneratedPortions()
以创建这些部分。该函数会先开始在计划上下文中调用 getLoadConcurrency()
以找到可用的线程数。
void prepareGeneratedPortions(ServerInterface &srvInterface,
ExecutorPlanContext &planCtxt,
std::vector<UDSource *> &sources,
std::map<std::string, Portion> initialPortions) {
if ((ssize_t) initialPortions.size() >= planCtxt.getLoadConcurrency()) {
/* all threads will be used, don't bother splitting into portions */
for (std::map<std::string, Portion>::const_iterator file = initialPortions.begin();
file != initialPortions.end(); ++file) {
sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
file->first, file->second));
} // for
return;
} // if
// Now we can split files to take advantage of potentially-unused threads.
// First sort by size (descending), then we will split the largest first.
// details elided...
}
有关此示例的完整实施,请参阅源代码。
本节中所示的示例是一个名为 FileSource
的简单 UDL 源函数,此函数可加载存储在主机文件系统上的文件中的数据(类似于标准 COPY 语句)。要调用 FileSource
,您必须提供一个名为 file
的参数,并且该参数必须包含主机文件系统上的一个或多个文件的绝对路径。您可以使用逗号分隔列表格式指定多个文件。
FileSource
函数还接受一个名为 nodes
的可选参数,此可选参数指示哪些节点应加载文件。如果未提供此参数,则在默认情况下,函数仅在启动程序节点上加载数据。由于此示例是一个简单示例,因此节点仅加载自己的文件系统中的文件。file 参数中的任何文件必须存在于 nodes 参数中的所有主机上。FileSource
UDSource 会尝试将 file
参数中的所有文件加载到 nodes
参数中的所有主机上。
可以使用以下 Python 脚本生成文件并将这些文件分发给 Vertica 群集中的主机。使用这些文件,您可以试验示例 UDSource
函数。要运行此函数,您必须能够进行无密码 SSH 登录,以将文件复制到其他主机。因此,您必须使用数据库管理员帐户在数据库主机之一上运行该脚本。
#!/usr/bin/python
# Save this file as UDLDataGen.py
import string
import random
import sys
import os
# Read in the dictionary file to provide random words. Assumes the words
# file is located in /usr/share/dict/words
wordFile = open("/usr/share/dict/words")
wordDict = []
for line in wordFile:
if len(line) > 6:
wordDict.append(line.strip())
MAXSTR = 4 # Maximum number of words to concatentate
NUMROWS = 1000 # Number of rows of data to generate
#FILEPATH = '/tmp/UDLdata.txt' # Final filename to use for UDL source
TMPFILE = '/tmp/UDLtemp.txt' # Temporary filename.
# Generate a random string by concatenating several words together. Max
# number of words set by MAXSTR
def randomWords():
words = [random.choice(wordDict) for n in xrange(random.randint(1, MAXSTR))]
sentence = " ".join(words)
return sentence
# Create a temporary data file that will be moved to a node. Number of
# rows for the file is set by NUMROWS. Adds the name of the node which will
# get the file, to show which node loaded the data.
def generateFile(node):
outFile = open(TMPFILE, 'w')
for line in xrange(NUMROWS):
outFile.write('{0}|{1}|{2}\n'.format(line,randomWords(),node))
outFile.close()
# Copy the temporary file to a node. Only works if passwordless SSH login
# is enabled, which it is for the database administrator account on
# Vertica hosts.
def copyFile(fileName,node):
os.system('scp "%s" "%s:%s"' % (TMPFILE, node, fileName) )
# Loop through the comma-separated list of nodes given in the first
# parameter, creating and copying data files whose full comma-separated
# paths are passed in the second parameter
for node in [x.strip() for x in sys.argv[1].split(',')]:
for fileName in [y.strip() for y in sys.argv[2].split(',')]:
print "generating file", fileName, "for", node
generateFile(node)
print "Copying file to",node
copyFile(fileName,node)
您可以调用该脚本,并为其提供要接收文件的主机的逗号分隔列表和要生成的文件的绝对路径的逗号分隔列表。例如:
python UDLDataGen.py v_vmart_node0001,v_vmart_node0002,v_vmart_node0003 /tmp/UDLdata01.txt,/tmp/UDLdata02.txt,\
UDLdata03.txt
该脚本将生成包含一千个行(各列用管道字符 (|) 分隔)的文件。这些列包含一个索引值、一组随机词以及为其生成了文件的节点,如以下输出示例所示:
0|megabits embanks|v_vmart_node0001
1|unneatly|v_vmart_node0001
2|self-precipitation|v_vmart_node0001
3|antihistamine scalados Vatter|v_vmart_node0001
按如下所示加载并使用 FileSource
UDSource:
=> --Load library and create the source function
=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaUDlLib.jar'
-> LANGUAGE 'JAVA';
CREATE LIBRARY
=> CREATE SOURCE File as LANGUAGE 'JAVA' NAME
-> 'com.mycompany.UDL.FileSourceFactory' LIBRARY JavaLib;
CREATE SOURCE FUNCTION
=> --Create a table to hold the data loaded from files
=> CREATE TABLE t (i integer, text VARCHAR, node VARCHAR);
CREATE TABLE
=> -- Copy a single file from the currently host using the FileSource
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt');
Rows Loaded
-------------
1000
(1 row)
=> --See some of what got loaded.
=> SELECT * FROM t WHERE i < 5 ORDER BY i;
i | text | node
---+-------------------------------+-----------------
0 | megabits embanks | v_vmart_node0001
1 | unneatly | v_vmart_node0001
2 | self-precipitation | v_vmart_node0001
3 | antihistamine scalados Vatter | v_vmart_node0001
4 | fate-menaced toilworn | v_vmart_node0001
(5 rows)
=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> -- Now load a file from three hosts. All of these hosts must have a file
=> -- named /tmp/UDLdata01.txt, each with different data
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt',
-> nodes='v_vmart_node0001,v_vmart_node0002,v_vmart_node0003');
Rows Loaded
-------------
3000
(1 row)
=> --Now see what has been loaded
=> SELECT * FROM t WHERE i < 5 ORDER BY i,node ;
i | text | node
---+-------------------------------------------------+--------
0 | megabits embanks | v_vmart_node0001
0 | nimble-eyed undupability frowsier | v_vmart_node0002
0 | Circean nonrepellence nonnasality | v_vmart_node0003
1 | unneatly | v_vmart_node0001
1 | floatmaker trabacolos hit-in | v_vmart_node0002
1 | revelrous treatableness Halleck | v_vmart_node0003
2 | self-precipitation | v_vmart_node0001
2 | whipcords archipelagic protodonatan copycutter | v_vmart_node0002
2 | Paganalian geochemistry short-shucks | v_vmart_node0003
3 | antihistamine scalados Vatter | v_vmart_node0001
3 | swordweed touristical subcommanders desalinized | v_vmart_node0002
3 | batboys | v_vmart_node0003
4 | fate-menaced toilworn | v_vmart_node0001
4 | twice-wanted cirrocumulous | v_vmart_node0002
4 | doon-head-clock | v_vmart_node0003
(15 rows)
=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> --Now copy from several files on several hosts
=> COPY t SOURCE File(file='/tmp/UDLdata01.txt,/tmp/UDLdata02.txt,/tmp/UDLdata03.txt'
-> ,nodes='v_vmart_node0001,v_vmart_node0002,v_vmart_node0003');
Rows Loaded
-------------
9000
(1 row)
=> SELECT * FROM t WHERE i = 0 ORDER BY node ;
i | text | node
---+---------------------------------------------+--------
0 | Awolowo Mirabilis D'Amboise | v_vmart_node0001
0 | sortieing Divisionism selfhypnotization | v_vmart_node0001
0 | megabits embanks | v_vmart_node0001
0 | nimble-eyed undupability frowsier | v_vmart_node0002
0 | thiaminase hieroglypher derogated soilborne | v_vmart_node0002
0 | aurigraphy crocket stenocranial | v_vmart_node0002
0 | Khulna pelmets | v_vmart_node0003
0 | Circean nonrepellence nonnasality | v_vmart_node0003
0 | matterate protarsal | v_vmart_node0003
(9 rows)
以下代码显示了从主机文件系统读取文件的 FileSource
类的源。FileSourceFactory.prepareUDSources()
所调用的构造函数可获取包含要读取的数据的文件的绝对文件。setup()
方法可打开文件,而 destroy()
方法可关闭文件。process()
方法可读取文件中的数据并将数据写入到缓冲区(由作为参数传递的 DataBuffer
类的实例提供)。如果读取操作已将输出缓冲区填满,则该方法将返回 OUTPUT_NEEDED
。此值可指示 Vertica 在加载的下一个阶段已处理输出缓冲区之后再次调用该方法。如果读取操作未将输出缓冲区填满,则 process()
将返回 DONE,以指示已完成对数据源的处理。
package com.mycompany.UDL;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.RandomAccessFile;
import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.UDSource;
import com.vertica.sdk.UdfException;
public class FileSource extends UDSource {
private String filename; // The file for this UDSource to read
private RandomAccessFile reader; // handle to read from file
// The constructor just stores the absolute filename of the file it will
// read.
public FileSource(String filename) {
super();
this.filename = filename;
}
// Called before Vertica starts requesting data from the data source.
// In this case, setup needs to open the file and save to the reader
// property.
@Override
public void setup(ServerInterface srvInterface ) throws UdfException{
try {
reader = new RandomAccessFile(new File(filename), "r");
} catch (FileNotFoundException e) {
// In case of any error, throw a UDfException. This will terminate
// the data load.
String msg = e.getMessage();
throw new UdfException(0, msg);
}
}
// Called after data has been loaded. In this case, close the file handle.
@Override
public void destroy(ServerInterface srvInterface ) throws UdfException {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
String msg = e.getMessage();
throw new UdfException(0, msg);
}
}
}
@Override
public StreamState process(ServerInterface srvInterface, DataBuffer output)
throws UdfException {
// Read up to the size of the buffer provided in the DataBuffer.buf
// property. Here we read directly from the file handle into the
// buffer.
long offset;
try {
offset = reader.read(output.buf,output.offset,
output.buf.length-output.offset);
} catch (IOException e) {
// Throw an exception in case of any errors.
String msg = e.getMessage();
throw new UdfException(0, msg);
}
// Update the number of bytes processed so far by the data buffer.
output.offset +=offset;
// See end of data source has been reached, or less data was read
// than can fit in the buffer
if(offset == -1 || offset < output.buf.length) {
// No more data to read.
return StreamState.DONE;
}else{
// Tell Vertica to call again when buffer has been emptied
return StreamState.OUTPUT_NEEDED;
}
}
}
以下代码是 Java UDx 支持包中提供的示例 Java UDsource 函数的修改版本。可以在 /opt/vertica/sdk/examples/JavaUDx/UDLFuctions/com/vertica/JavaLibs/FileSourceFactory.java
中找到完整示例。通过覆盖 plan()
方法,可验证用户是否提供了必需的 file
参数。如果用户还提供了可选的 nodes 参数,该方法还可以验证节点是否存在于 Vertica 群集中。如果任一参数存在问题,该方法将抛出异常并向用户返回错误。如果两个参数都没有问题,则 plan()
方法会将其值存储在计划上下文对象中。
package com.mycompany.UDL;
import java.util.ArrayList;
import java.util.Vector;
import com.vertica.sdk.NodeSpecifyingPlanContext;
import com.vertica.sdk.ParamReader;
import com.vertica.sdk.ParamWriter;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.SourceFactory;
import com.vertica.sdk.UDSource;
import com.vertica.sdk.UdfException;
public class FileSourceFactory extends SourceFactory {
// Called once on the initiator host to do initial setup. Checks
// parameters and chooses which nodes will do the work.
@Override
public void plan(ServerInterface srvInterface,
NodeSpecifyingPlanContext planCtxt) throws UdfException {
String nodes; // stores the list of nodes that will load data
// Get copy of the parameters the user supplied to the UDSource
// function call.
ParamReader args = srvInterface.getParamReader();
// A list of nodes that will perform work. This gets saved as part
// of the plan context.
ArrayList<String> executionNodes = new ArrayList<String>();
// First, ensure the user supplied the file parameter
if (!args.containsParameter("file")) {
// Withut a file parameter, we cannot continue. Throw an
// exception that will be caught by the Java UDx framework.
throw new UdfException(0, "You must supply a file parameter");
}
// If the user specified nodes to read the file, parse the
// comma-separated list and save. Otherwise, assume just the
// Initiator node has the file to read.
if (args.containsParameter("nodes")) {
nodes = args.getString("nodes");
// Get list of nodes in cluster, to ensure that the node the
// user specified actually exists. The list of nodes is available
// from the planCTxt (plan context) object,
ArrayList<String> clusterNodes = planCtxt.getClusterNodes();
// Parse the string parameter "nodes" which
// is a comma-separated list of node names.
String[] nodeNames = nodes.split(",");
for (int i = 0; i < nodeNames.length; i++){
// See if the node the user gave us actually exists
if(clusterNodes.contains(nodeNames[i]))
// Node exists. Add it to list of nodes.
executionNodes.add(nodeNames[i]);
else{
// User supplied node that doesn't exist. Throw an
// exception so the user is notified.
String msg = String.format("Specified node '%s' but no" +
" node by that name is available. Available nodes "
+ "are \"%s\".",
nodeNames[i], clusterNodes.toString());
throw new UdfException(0, msg);
}
}
} else {
// User did not supply a list of node names. Assume the initiator
// is the only host that will read the file. The srvInterface
// instance passed to this method has a getter for the current
// node.
executionNodes.add(srvInterface.getCurrentNodeName());
}
// Set the target node(s) in the plan context
planCtxt.setTargetNodes(executionNodes);
// Set parameters for each node reading data that tells it which
// files it will read. In this simple example, just tell it to
// read all of the files the user passed in the file parameter
String files = args.getString("file");
// Get object to write parameters into the plan context object.
ParamWriter nodeParams = planCtxt.getWriter();
// Loop through list of execution nodes, and add a parameter to plan
// context named for each node performing the work, which tells it the
// list of files it will process. Each node will look for a
// parameter named something like "filesForv_vmart_node0002" in its
// prepareUDSources() method.
for (int i = 0; i < executionNodes.size(); i++) {
nodeParams.setString("filesFor" + executionNodes.get(i), files);
}
}
// Called on each host that is reading data from a source. This method
// returns an array of UDSource objects that process each source.
@Override
public ArrayList<UDSource> prepareUDSources(ServerInterface srvInterface,
NodeSpecifyingPlanContext planCtxt) throws UdfException {
// An array to hold the UDSource subclasses that we instaniate
ArrayList<UDSource> retVal = new ArrayList<UDSource>();
// Get the list of files this node is supposed to process. This was
// saved by the plan() method in the plancontext
String myName = srvInterface.getCurrentNodeName();
ParamReader params = planCtxt.getReader();
String fileNames = params.getString("filesFor" + myName);
// Note that you can also be lazy and directly grab the parameters
// the user passed to the UDSource functon in the COPY statement directly
// by getting parameters from the ServerInterface object. I.e.:
//String fileNames = srvInterface.getParamReader().getString("file");
// Split comma-separated list into a single list.
String[] fileList = fileNames.split(",");
for (int i = 0; i < fileList.length; i++){
// Instantiate a FileSource object (which is a subclass of UDSource)
// to read each file. The constructor for FileSource takes the
// file name of the
retVal.add(new FileSource(fileList[i]));
}
// Return the collection of FileSource objects. They will be called,
// in turn, to read each of the files.
return retVal;
}
// Declares which parameters that this factory accepts.
@Override
public void getParameterType(ServerInterface srvInterface,
SizedColumnTypes parameterTypes) {
parameterTypes.addVarchar(65000, "file");
parameterTypes.addVarchar(65000, "nodes");
}
}