C++ 示例:并发加载
FilePortionSource
示例演示了如何使用并发加载。此示例是对 FileSource
示例进行的改进。每个输入文件都会拆分成多个部分并分发到 FilePortionSource
实例。源接受将输入分成多个部分所依据的偏移量列表;如果未提供偏移量,源将动态拆分输入。
并发加载将在工厂中进行处理,所以本文讨论的重点是 FilePortionSourceFactory
。该示例的完整代码位于 /opt/vertica/sdk/examples/ApportionLoadFunctions
中。该分发还包括此示例的 Java 版本。
加载和使用示例
按如下所示加载并使用 FilePortionSource
示例。
=> CREATE LIBRARY FilePortionLib AS '/home/dbadmin/FP.so';
=> CREATE SOURCE FilePortionSource AS LANGUAGE 'C++'
-> NAME 'FilePortionSourceFactory' LIBRARY FilePortionLib;
=> COPY t WITH SOURCE FilePortionSource(file='g1/*.dat', nodes='initiator,e0,e1', offsets = '0,380000,820000');
=> COPY t WITH SOURCE FilePortionSource(file='g2/*.dat', nodes='e0,e1,e2', local_min_portion_size = 2097152);
实施
并发加载会在两个位置影响源工厂:getDesiredThreads()
和 prepareUDSourcesExecutor()
。
getDesiredThreads()
getDesiredThreads()
成员函数可确定要请求的线程数。Vertica 会在调用 prepareUDSourcesExecutor()
之前在每个执行程序节点上调用此成员函数。
该函数会先开始将输入文件路径(可能为 glob)分成多个单独的路径。本讨论省略了这些细节。如果未使用分摊加载,则该函数为每个文件分配一个源。
virtual ssize_t getDesiredThreads(ServerInterface &srvInterface,
ExecutorPlanContext &planCtxt) {
const std::string filename = srvInterface.getParamReader().getStringRef("file").str();
std::vector<std::string> paths;
// expand the glob - at least one thread per source.
...
// figure out how to assign files to sources
const std::string nodeName = srvInterface.getCurrentNodeName();
const size_t nodeId = planCtxt.getWriter().getIntRef(nodeName);
const size_t numNodes = planCtxt.getTargetNodes().size();
if (!planCtxt.canApportionSource()) {
/* no apportioning, so the number of files is the final number of sources */
std::vector<std::string> *expanded =
vt_createFuncObject<std::vector<std::string> >(srvInterface.allocator, paths);
/* save expanded paths so we don't have to compute expansion again */
planCtxt.getWriter().setPointer("expanded", expanded);
return expanded->size();
}
// ...
如果可以分摊源,则 getDesiredThreads()
会使用作为实参传递给工厂的偏移量,以将文件拆分成多个部分。然后,它会将这些部分分配给可用节点。此函数实际上不会直接分配源;完成这项工作是为了确定要请求的线程数。
else if (srvInterface.getParamReader().containsParameter("offsets")) {
// if the offsets are specified, then we will have a fixed number of portions per file.
// Round-robin assign offsets to nodes.
// ...
/* Construct the portions that this node will actually handle.
* This isn't changing (since the offset assignments are fixed),
* so we'll build the Portion objects now and make them available
* to prepareUDSourcesExecutor() by putting them in the ExecutorContext.
*
* We don't know the size of the last portion, since it depends on the file
* size. Rather than figure it out here we will indicate it with -1 and
* defer that to prepareUDSourcesExecutor().
*/
std::vector<Portion> *portions =
vt_createFuncObject<std::vector<Portion>>(srvInterface.allocator);
for (std::vector<size_t>::const_iterator offset = offsets.begin();
offset != offsets.end(); ++offset) {
Portion p(*offset);
p.is_first_portion = (offset == offsets.begin());
p.size = (offset + 1 == offsets.end() ? -1 : (*(offset + 1) - *offset));
if ((offset - offsets.begin()) % numNodes == nodeId) {
portions->push_back(p);
srvInterface.log("FilePortionSource: assigning portion %ld: [offset = %lld, size = %lld]",
offset - offsets.begin(), p.offset, p.size);
}
}
该函数现在具有所有部分,因此可以知道部分的数量:
planCtxt.getWriter().setPointer("portions", portions);
/* total number of threads we want is the number of portions per file, which is fixed */
return portions->size() * expanded->size();
} // end of "offsets" parameter
如果未提供偏移量,则该函数会将文件动态拆分成多个部分,每个线程一个部分。本讨论省略了此计算过程的细节。请求的线程数多于可用线程数没有意义,因此该函数会对用 PlanContext
(函数的实参)调用 getMaxAllowedThreads()
来设置上限:
if (portions->size() >= planCtxt.getMaxAllowedThreads()) {
return paths.size();
}
有关此函数如何将文件拆分成多个部分的详细信息,请参阅完整示例。
此函数使用 vt_createFuncObject
模板来创建对象。Vertica 会调用使用此宏创建的返回对象的析构函数,但它不会调用其他对象(如向量)的析构函数。您必须自己调用这些析构函数以避免内存泄漏。在此示例中,这些调用是在 prepareUDSourcesExecutor()
中进行的。
prepareUDSourcesExecutor()
prepareUDSourcesExecutor()
成员函数(如 getDesiredThreads()
)包含单独的代码块,具体取决于是否提供了偏移量。在这两种情况下,该函数都会将输入拆分成多个部分并为它们创建 UDSource
实例。
如果使用偏移量调用函数,则 prepareUDSourcesExecutor()
会调用 prepareCustomizedPortions()
。此函数如下所示。
/* prepare portions as determined via the "offsets" parameter */
void prepareCustomizedPortions(ServerInterface &srvInterface,
ExecutorPlanContext &planCtxt,
std::vector<UDSource *> &sources,
const std::vector<std::string> &expandedPaths,
std::vector<Portion> &portions) {
for (std::vector<std::string>::const_iterator filename = expandedPaths.begin();
filename != expandedPaths.end(); ++filename) {
/*
* the "portions" vector contains the portions which were generated in
* "getDesiredThreads"
*/
const size_t fileSize = getFileSize(*filename);
for (std::vector<Portion>::const_iterator portion = portions.begin();
portion != portions.end(); ++portion) {
Portion fportion(*portion);
if (fportion.size == -1) {
/* as described above, this means from the offset to the end */
fportion.size = fileSize - portion->offset;
sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
*filename, fportion));
} else if (fportion.size > 0) {
sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
*filename, fportion));
}
}
}
}
如果未使用偏移量调用 prepareUDSourcesExecutor()
,则它必须决定要创建的部分数。
基本规则是每个源使用一个部分。但是,如果有额外的线程可用,该函数会将输入拆分成更多部分,以便源可以同时处理它们。然后,prepareUDSourcesExecutor()
会调用 prepareGeneratedPortions()
以创建这些部分。该函数会先开始在计划上下文中调用 getLoadConcurrency()
以找到可用的线程数。
void prepareGeneratedPortions(ServerInterface &srvInterface,
ExecutorPlanContext &planCtxt,
std::vector<UDSource *> &sources,
std::map<std::string, Portion> initialPortions) {
if ((ssize_t) initialPortions.size() >= planCtxt.getLoadConcurrency()) {
/* all threads will be used, don't bother splitting into portions */
for (std::map<std::string, Portion>::const_iterator file = initialPortions.begin();
file != initialPortions.end(); ++file) {
sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
file->first, file->second));
} // for
return;
} // if
// Now we can split files to take advantage of potentially-unused threads.
// First sort by size (descending), then we will split the largest first.
// details elided...
}
有关详细信息
有关此示例的完整实施,请参阅源代码。