Vertica can divide the work of loading data, taking advantage of parallelism to speed up the operation. Vertica supports several types of parallelism:
Distributed load: Vertica distributes files in a multi-file load to several nodes to load in parallel, instead of loading all of them on a single node. Vertica manages distributed load; you do not need to do anything special in your UDL.
Cooperative parse: A source being loaded on a single node uses multi-threading to parallelize the parse. Cooperative parse divides a load at execution time, based on how threads are scheduled. You must enable cooperative parse in your parser. See Cooperative parse.
Apportioned load: Vertica divides a single large file or other single source into segments, which it assigns to several nodes to load in parallel. Apportioned load divides the load at planning time, based on available nodes and cores on each node. You must enable apportioned load in your source and parser. See Apportioned load.
You can support both cooperative parse and apportioned load in the same UDL. Vertica decides which to use for each load operation and might use both. See Combining cooperative parse and apportioned load.
1 - Cooperative parse
By default, Vertica parses a data source in a single thread on one database node.
By default, Vertica parses a data source in a single thread on one database node. You can optionally use cooperative parse to parse a source using multiple threads on a node. More specifically, data from a source passes through a chunker that groups blocks from the source stream into logical units. These chunks can be parsed in parallel. The chunker divides the input into pieces that can be individually parsed, and the parser then parses them concurrently. Cooperative parse is available only for unfenced UDxs. (See Fenced and unfenced modes.)
To use cooperative parse, a chunker must be able to locate end-of-record markers in the input. Locating these markers might not be possible in all input formats.
Chunkers are created by parser factories. At load time, Vertica first calls the UDChunker to divide the input into chunks and then calls the UDParser to parse each chunk.
When Vertica receives data from a source, it calls the chunker's process() method repeatedly. A chunker is, essentially, a lightweight parser; instead of parsing, the process() method divides the input into chunks.
After the chunker has finished dividing the input into chunks, Vertica sends those chunks to as many parsers as are available, calling the process() method on the parser.
Implementing cooperative parse
To implement cooperative parse, perform the following actions:
Subclass UDChunker and implement process().
In your ParserFactory, implement prepareChunker() to return a UDChunker.
A parser can use more than one database node to load a single input source in parallel.
A parser can use more than one database node to load a single input source in parallel. This approach is referred to as apportioned load. Among the parsers built into Vertica, the default (delimited) parser supports apportioned load.
Apportioned load, like cooperative parse, requires an input that can be divided at record boundaries. The difference is that cooperative parse does a sequential scan to find record boundaries, while apportioned load first jumps (seeks) to a given position and then scans. Some formats, like generic XML, do not support seeking.
To use apportioned load, you must ensure that the source is reachable by all participating database nodes. You typically use apportioned load with distributed file systems.
It is possible for a parser to not support apportioned load directly but to have a chunker that supports apportioning.
If both the parser and its source support apportioning, then you can specify that a single input is to be distributed to multiple database nodes for loading. The SourceFactory breaks the input into portions and assigns them to execution nodes. Each Portion consists of an offset into the input and a size. Vertica distributes the portions and their parameters to the execution nodes. A source factory running on each node produces a UDSource for the given portion.
The UDParser first determines where to start parsing:
If the portion is the first one in the input, the parser advances to the offset and begins parsing.
If the portion is not the first, the parser advances to the offset and then scans until it finds the end of a record. Because records can break across portions, parsing begins after the first record-end encountered.
The parser must complete a record, which might require it to read past the end of the portion. The parser is responsible for parsing all records that begin in the assigned portion, regardless of where they end. Most of this work occurs within the process() method of the parser.
Sometimes, a portion contains nothing to be parsed by its assigned node. For example, suppose you have a record that begins in portion 1, runs through all of portion 2, and ends in portion 3. The parser assigned to portion 1 parses the record, and the parser assigned to portion 3 starts after that record. The parser assigned to portion 2, however, has no record starting within its portion.
If the load also uses Cooperative parse, then after apportioning the load and before parsing, Vertica divides portions into chunks for parallel loading.
Implementing apportioned load
To implement apportioned load, perform the following actions in the source, the parser, and their factories.
In your SourceFactory subclass:
Implement isSourceApportionable() and return true.
Implement plan() to determine portion size, designate portions, and assign portions to execution nodes. To assign portions to particular executors, pass the information using the parameter writer on the plan context (PlanContext::getWriter()).
Implement prepareUDSources(). Vertica calls this method on each execution node with the plan context created by the factory. This method returns the UDSource instances to be used for this node's assigned portions.
If sources can take advantage of parallelism, you can implement getDesiredThreads() to request a number of threads for each source. See SourceFactory class for more information about this method.
In your UDSource subclass, implement process() as you would for any other source, using the assigned portion. You can retrieve this portion with getPortion().
In your ParserFactory subclass:
Implement isParserApportionable() and return true.
If your parser uses a UDChunker that supports apportioned load, implement isChunkerApportionable().
In your UDParser subclass:
Write your UDParser subclass to operate on portions rather than whole sources. You can do so by handling the stream states PORTION_START and PORTION_END, or by using the ContinuousUDParser API. Your parser must scan for the beginning of the portion, find the first record boundary after that position, and parse to the end of the last record beginning in that portion. Be aware that this behavior might require that the parser read beyond the end of the portion.
Handle the special case of a portion containing no record start by returning without writing any output.
In your UDChunker subclass, implement alignPortion(). See Aligning Portions.
Example
The SDK provides a C++ example of apportioned load in the ApportionLoadFunctions directory:
FilePortionSource is a subclass of UDSource.
DelimFilePortionParser is a subclass of ContinuousUDParser.
Use these classes together. You could also use FilePortionSource with the built-in delimited parser.
The following example shows how you can load the libraries and create the functions in the database:
=> CREATE LIBRARY FilePortionSourceLib as '/home/dbadmin/FP.so';
=> CREATE LIBRARY DelimFilePortionParserLib as '/home/dbadmin/Delim.so';
=> CREATE SOURCE FilePortionSource AS
LANGUAGE 'C++' NAME 'FilePortionSourceFactory' LIBRARY FilePortionSourceLib;
=> CREATE PARSER DelimFilePortionParser AS
LANGUAGE 'C++' NAME 'DelimFilePortionParserFactory' LIBRARY DelimFilePortionParserLib;
The following example shows how you can use the source and parser to load data:
=> COPY t WITH SOURCE FilePortionSource(file='g1/*.dat') PARSER DelimFilePortionParser(delimiter = '|',
record_terminator = '~');
3 - Combining cooperative parse and apportioned load
You can enable both Cooperative Parse and Apportioned Load in the same parser, allowing Vertica to decide how to load data.
Vertica uses apportioned load, where possible, at query-planning time. It decides whether to also use cooperative parse at execution time.
Apportioned load requires SourceFactory support. Given a suitable UDSource, at planning time Vertica calls the isParserApportionable() method on the ParserFactory. If this method returns true, Vertica apportions the load.
If isParserApportionable() returns false but isChunkerApportionable() returns true, then a chunker is available for cooperative parse and that chunker supports apportioned load. Vertica apportions the load.
If neither of these methods returns true, then Vertica does not apportion the load.
At execution time, Vertica first checks whether the load is running in unfenced mode and proceeds only if it is. Cooperative parse is not supported in fenced mode.
If the load is not apportioned, and more than one thread is available, Vertica uses cooperative parse.
If the load is apportioned, and exactly one thread is available, Vertica uses cooperative parse if and only if the parser is not apportionable. In this case, the chunker is apportionable but the parser is not.
If the load is apportioned, and more than one thread is available, and the chunker is apportionable, Vertica uses cooperative parse.
If Vertica uses cooperative parse but prepareChunker() does not return a UDChunker instance, Vertica reports an error.
Executing apportioned, cooperative loads
If a load uses both apportioned load and cooperative parse, Vertica uses the SourceFactory to break the input into portions. It then assigns the portions to execution nodes. See How Vertica Apportions a Load.
On the execution node, Vertica calls the chunker's alignPortion() method to align the input with portion boundaries. (This step is skipped for the first portion, which by definition is already aligned at the beginning.) This step is necessary because a parser using apportioned load sometimes has to read beyond the end of the portion, so a chunker needs to find the end point.
After aligning the portion, Vertica calls the chunker's process() method repeatedly. See How Vertica Divides a Load.
The chunks found by the chunker are then sent to the parser's process() method for processing in the usual way.