C++ example: concurrent load
The FilePortionSource
example demonstrates the use of concurrent load. This example is a refinement of the FileSource
example. Each input file is divided into portions and distributed to FilePortionSource
instances. The source accepts a list of offsets at which to break the input into portions; if offsets are not provided, the source divides the input dynamically.
Concurrent load is handled in the factory, so this discussion focuses on FilePortionSourceFactory
. The full code for the example is located in /opt/vertica/sdk/examples/ApportionLoadFunctions
. The distribution also includes a Java version of this example.
Loading and using the example
Load and use the FilePortionSource
example as follows.
=> CREATE LIBRARY FilePortionLib AS '/home/dbadmin/FP.so';
=> CREATE SOURCE FilePortionSource AS LANGUAGE 'C++'
-> NAME 'FilePortionSourceFactory' LIBRARY FilePortionLib;
=> COPY t WITH SOURCE FilePortionSource(file='g1/*.dat', nodes='initiator,e0,e1', offsets = '0,380000,820000');
=> COPY t WITH SOURCE FilePortionSource(file='g2/*.dat', nodes='e0,e1,e2', local_min_portion_size = 2097152);
Implementation
Concurrent load affects the source factory in two places, getDesiredThreads()
and prepareUDSourcesExecutor()
.
getDesiredThreads()
The getDesiredThreads()
member function determines the number of threads to request. Vertica calls this member function on each executor node before calling prepareUDSourcesExecutor()
.
The function begins by breaking an input file path, which might be a glob, into individual paths. This discussion omits those details. If apportioned load is not being used, then the function allocates one source per file.
If the source can be apportioned, then getDesiredThreads()
uses the offsets that were passed as arguments to the factory to divide the file into portions. It then allocates portions to available nodes. This function does not actually assign sources directly; this work is done to determine how many threads to request.
The function now has all the portions and thus the number of portions:
If offsets were not provided, the function divides the file into portions dynamically, one portion per thread. This discussion omits the details of this computation. There is no point in requesting more threads than are available, so the function calls getMaxAllowedThreads()
on the PlanContext
(an argument to the function) to set an upper bound:
See the full example for the details of how this function divides the file into portions.
This function uses the vt_createFuncObject
template to create objects. Vertica calls the destructors of returned objects created using this macro, but it does not call destructors for other objects like vectors. You must call these destructors yourself to avoid memory leaks. In this example, these calls are made in prepareUDSourcesExecutor()
.
prepareUDSourcesExecutor()
The prepareUDSourcesExecutor()
member function, like getDesiredThreads()
, has separate blocks of code depending on whether offsets are provided. In both cases, the function breaks input into portions and creates UDSource
instances for them.
If the function is called with offsets, prepareUDSourcesExecutor()
calls prepareCustomizedPortions()
. This function follows.
If prepareUDSourcesExecutor()
is called without offsets, then it must decide how many portions to create.
The base case is to use one portion per source. However, if extra threads are available, the function divides the input into more portions so that a source can process them concurrently. Then prepareUDSourcesExecutor()
calls prepareGeneratedPortions()
to create the portions. This function begins by calling getLoadConcurrency()
on the plan context to find out how many threads are available.
For more information
See the source code for the full implementation of this example.