C++ 示例:并发加载

FilePortionSource 示例演示了如何使用并发加载。此示例是对 FileSource 示例进行的改进。每个输入文件都会拆分成多个部分并分发到 FilePortionSource 实例。源接受将输入分成多个部分所依据的偏移量列表;如果未提供偏移量,源将动态拆分输入。

并发加载将在工厂中进行处理,所以本文讨论的重点是 FilePortionSourceFactory。该示例的完整代码位于 /opt/vertica/sdk/examples/ApportionLoadFunctions 中。该分发还包括此示例的 Java 版本。

加载和使用示例

按如下所示加载并使用 FilePortionSource 示例。

=> CREATE LIBRARY FilePortionLib AS '/home/dbadmin/FP.so';

=> CREATE SOURCE FilePortionSource AS LANGUAGE 'C++'
-> NAME 'FilePortionSourceFactory' LIBRARY FilePortionLib;

=> COPY t WITH SOURCE FilePortionSource(file='g1/*.dat', nodes='initiator,e0,e1', offsets = '0,380000,820000');

=> COPY t WITH SOURCE FilePortionSource(file='g2/*.dat', nodes='e0,e1,e2', local_min_portion_size = 2097152);

实施

并发加载会在两个位置影响源工厂:getDesiredThreads()prepareUDSourcesExecutor()

getDesiredThreads()

getDesiredThreads() 成员函数可确定要请求的线程数。Vertica 会在调用 prepareUDSourcesExecutor() 之前在每个执行程序节点上调用此成员函数。

该函数会先开始将输入文件路径(可能为 glob)分成多个单独的路径。本讨论省略了这些细节。如果未使用分摊加载,则该函数为每个文件分配一个源。

virtual ssize_t getDesiredThreads(ServerInterface &srvInterface,
    ExecutorPlanContext &planCtxt) {
  const std::string filename = srvInterface.getParamReader().getStringRef("file").str();

  std::vector<std::string> paths;
  // expand the glob - at least one thread per source.
  ...

  // figure out how to assign files to sources
  const std::string nodeName = srvInterface.getCurrentNodeName();
  const size_t nodeId = planCtxt.getWriter().getIntRef(nodeName);
  const size_t numNodes = planCtxt.getTargetNodes().size();

  if (!planCtxt.canApportionSource()) {
    /* no apportioning, so the number of files is the final number of sources */
    std::vector<std::string> *expanded =
        vt_createFuncObject<std::vector<std::string> >(srvInterface.allocator, paths);
    /* save expanded paths so we don't have to compute expansion again */
    planCtxt.getWriter().setPointer("expanded", expanded);
    return expanded->size();
  }

  // ...

如果可以分摊源,则 getDesiredThreads() 会使用作为实参传递给工厂的偏移量,以将文件拆分成多个部分。然后,它会将这些部分分配给可用节点。此函数实际上不会直接分配源;完成这项工作是为了确定要请求的线程数。

  else if (srvInterface.getParamReader().containsParameter("offsets")) {

    // if the offsets are specified, then we will have a fixed number of portions per file.
    // Round-robin assign offsets to nodes.
    // ...

    /* Construct the portions that this node will actually handle.
     * This isn't changing (since the offset assignments are fixed),
     * so we'll build the Portion objects now and make them available
     * to prepareUDSourcesExecutor() by putting them in the ExecutorContext.
     *
     * We don't know the size of the last portion, since it depends on the file
     * size.  Rather than figure it out here we will indicate it with -1 and
     * defer that to prepareUDSourcesExecutor().
     */
    std::vector<Portion> *portions =
        vt_createFuncObject<std::vector<Portion>>(srvInterface.allocator);

    for (std::vector<size_t>::const_iterator offset = offsets.begin();
            offset != offsets.end(); ++offset) {
        Portion p(*offset);
        p.is_first_portion = (offset == offsets.begin());
        p.size = (offset + 1 == offsets.end() ? -1 : (*(offset + 1) - *offset));

        if ((offset - offsets.begin()) % numNodes == nodeId) {
            portions->push_back(p);
            srvInterface.log("FilePortionSource: assigning portion %ld: [offset = %lld, size = %lld]",
                    offset - offsets.begin(), p.offset, p.size);
        }
      }

该函数现在具有所有部分,因此可以知道部分的数量:

      planCtxt.getWriter().setPointer("portions", portions);

      /* total number of threads we want is the number of portions per file, which is fixed */
      return portions->size() * expanded->size();
    } // end of "offsets" parameter

如果未提供偏移量,则该函数会将文件动态拆分成多个部分,每个线程一个部分。本讨论省略了此计算过程的细节。请求的线程数多于可用线程数没有意义,因此该函数会对用 PlanContext(函数的实参)调用 getMaxAllowedThreads() 来设置上限:

  if (portions->size() >= planCtxt.getMaxAllowedThreads()) {
    return paths.size();
  }

有关此函数如何将文件拆分成多个部分的详细信息,请参阅完整示例。

此函数使用 vt_createFuncObject 模板来创建对象。Vertica 会调用使用此宏创建的返回对象的析构函数,但它不会调用其他对象(如向量)的析构函数。您必须自己调用这些析构函数以避免内存泄漏。在此示例中,这些调用是在 prepareUDSourcesExecutor() 中进行的。

prepareUDSourcesExecutor()

prepareUDSourcesExecutor() 成员函数(如 getDesiredThreads())包含单独的代码块,具体取决于是否提供了偏移量。在这两种情况下,该函数都会将输入拆分成多个部分并为它们创建 UDSource 实例。

如果使用偏移量调用函数,则 prepareUDSourcesExecutor() 会调用 prepareCustomizedPortions()。此函数如下所示。

/* prepare portions as determined via the "offsets" parameter */
void prepareCustomizedPortions(ServerInterface &srvInterface,
                               ExecutorPlanContext &planCtxt,
                               std::vector<UDSource *> &sources,
                               const std::vector<std::string> &expandedPaths,
                               std::vector<Portion> &portions) {
    for (std::vector<std::string>::const_iterator filename = expandedPaths.begin();
            filename != expandedPaths.end(); ++filename) {
        /*
         * the "portions" vector contains the portions which were generated in
         * "getDesiredThreads"
         */
        const size_t fileSize = getFileSize(*filename);
        for (std::vector<Portion>::const_iterator portion = portions.begin();
                portion != portions.end(); ++portion) {
            Portion fportion(*portion);
            if (fportion.size == -1) {
                /* as described above, this means from the offset to the end */
                fportion.size = fileSize - portion->offset;
                sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
                            *filename, fportion));
            } else if (fportion.size > 0) {
                sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
                            *filename, fportion));
            }
        }
    }
}

如果未使用偏移量调用 prepareUDSourcesExecutor(),则它必须决定要创建的部分数。

基本规则是每个源使用一个部分。但是,如果有额外的线程可用,该函数会将输入拆分成更多部分,以便源可以同时处理它们。然后,prepareUDSourcesExecutor() 会调用 prepareGeneratedPortions() 以创建这些部分。该函数会先开始在计划上下文中调用 getLoadConcurrency() 以找到可用的线程数。

void prepareGeneratedPortions(ServerInterface &srvInterface,
                              ExecutorPlanContext &planCtxt,
                              std::vector<UDSource *> &sources,
                              std::map<std::string, Portion> initialPortions) {

  if ((ssize_t) initialPortions.size() >= planCtxt.getLoadConcurrency()) {
  /* all threads will be used, don't bother splitting into portions */

  for (std::map<std::string, Portion>::const_iterator file = initialPortions.begin();
       file != initialPortions.end(); ++file) {
    sources.push_back(vt_createFuncObject<FilePortionSource>(srvInterface.allocator,
            file->first, file->second));
       } // for
    return;
  } // if

  // Now we can split files to take advantage of potentially-unused threads.
  // First sort by size (descending), then we will split the largest first.

  // details elided...

}

有关详细信息

有关此示例的完整实施,请参阅源代码。