This is the multi-page printable view of this section.
Click here to print.
Return to the regular view of this page.
User-defined source
A user-defined source allows you to process a source of data using a method that is not built into Vertica.
A user-defined source allows you to process a source of data using a method that is not built into Vertica. For example, you can write a user-defined source to access the data from an HTTP source using cURL. While a given COPY statement can use specify only one user-defined source statement, the source function itself can pull data from multiple sources.
The
UDSource
class acquires data from an external source. It reads data from an input stream and produces an output stream to be filtered and parsed. If you implement a UDSource
, you must also implement a corresponding
SourceFactory
.
1 - UDSource class
You can subclass the UDSource class when you need to load data from a source type that COPY does not already support.
You can subclass the UDSource
class when you need to load data from a source type that COPY does not already support.
Each instance of your UDSource
subclass reads from a single data source. Examples of a single data source are a single file or the results of a single function call to a RESTful web application.
UDSource methods
Your UDSource
subclass must override process()
or processWithMetadata()
:
Note
processWithMetadata()
is available only for user-defined extensions (UDxs) written in the C++ programming language.
-
process()
reads the raw input stream as one large file. If there are any errors or failures, the entire load fails.
-
processWithMetadata()
is useful when the data source has metadata about record boundaries available in some structured format that's separate from the data payload. With this interface, the source emits a record length for each record in addition to the data.
By implementing processWithMetadata()
instead of process()
in each phase, you can retain this record length metadata throughout the load stack, which enables a more efficient parse that can recover from errors on a per-message basis, rather than a per-file or per-source basis. KafkaSource and the Kafka parsers (KafkaAvroParser, KafkaJSONParser, and KafkaParser) use this mechanism to support per-Kafka-message rejections when individual Kafka messages are cannot be parsed.
Note
To implement processWithMetadata()
, you must override useSideChannel()
to return true
.
Additionally, you can override the other UDSource
class methods.
Source execution
The following sections detail the execution sequence each time a user-defined source is called. The following example overrides the process()
method.
Setting Up
COPY calls setup()
before the first time it calls process()
. Use setup()
to perform any necessary setup steps to access the data source. This method establishes network connections, opens files, and similar tasks that need to be performed before the UDSource
can read data from the data source. Your object might be destroyed and re-created during use, so make sure that your object is restartable.
Processing a Source
COPY calls process()
repeatedly during query execution to read data and write it to the DataBuffer
passed as a parameter. This buffer is then passed to the first filter.
If the source runs out of input, or fills the output buffer, it must return the value StreamState.OUTPUT_NEEDED
. When Vertica gets this return value, it will call the method again. This second call occurs after the output buffer has been processed by the next stage in the data-load process. Returning StreamState.DONE
indicates that all of the data from the source has been read.
The user can cancel the load operation, which aborts reading.
Tearing Down
COPY calls destroy()
after the last time that process()
is called. This method frees any resources reserved by the setup()
or process()
methods, such as file handles or network connections that the setup()
method allocated.
Accessors
A source can define two accessors, getSize()
and getUri()
.
COPY might call getSize()
to estimate the number of bytes of data to be read before calling process()
. This value is an estimate only and is used to indicate the file size in the LOAD_STREAMS table. Because Vertica can call this method before calling setup()
, getSize()
must not rely on any resources allocated by setup()
.
This method should not leave any resources open. For example, do not save any file handles opened by getSize()
for use by the process()
method. Doing so can exhaust the available resources, because Vertica calls getSize()
on all instances of your UDSource
subclass before any data is loaded. If many data sources are being opened, these open file handles could use up the system's supply of file handles. Thus, none would remain available to perform the actual data load.
Vertica calls getUri()
during execution to update status information about which resources are currently being loaded. It returns the URI of the data source being read by this UDSource
.
API
The UDSource API provides the following methods for extension by subclasses:
The UDSource API provides the following methods for extension by subclasses:
2 - SourceFactory class
If you write a source, you must also write a source factory.
If you write a source, you must also write a source factory. Your subclass of the SourceFactory
class is responsible for:
-
Performing the initial validation of the parameters passed to your UDSource
.
-
Setting up any data structures your UDSource
instances need to perform their work. This information can include recording which nodes will read which data source.
-
Creating one instance of your UDSource
subclass for each data source (or portion thereof) that your function reads on each host.
The simplest source factory creates one UDSource
instance per data source per executor node. You can also use multiple concurrent UDSource
instances on each node. This behavior is called concurrent load. To support both options, SourceFactory
has two versions of the method that creates the sources. You must implement exactly one of them.
Source factories are singletons. Your subclass must be stateless, with no fields containing data. The subclass also must not modify any global variables.
SourceFactory methods
The SourceFactory
class defines several methods. Your class must override prepareUDSources()
; it may override the other methods.
Setting up
Vertica calls plan()
once on the initiator node to perform the following tasks:
-
Check the parameters the user supplied to the function call in the COPY statement and provide error messages if there are any issues. You can read the parameters by getting a ParamReader
object from the instance of ServerInterface
passed into the plan()
method.
-
Decide which hosts in the cluster will read the data source. How you divide up the work depends on the source your function is reading. Some sources can be split across many hosts, such as a source that reads data from many URLs. Others, such as an individual local file on a host's file system, can be read only by a single specified host.
You store the list of hosts to read the data source by calling the setTargetNodes()
method on the NodeSpecifyingPlanContext
object. This object is passed into your plan()
method.
-
Store any information that the individual hosts need to process the data sources in the NodeSpecifyingPlanContext
instance passed to the plan()
method. For example, you could store assignments that tell each host which data sources to process. The plan()
method runs only on the initiator node, and the prepareUDSources()
method runs on each host reading from a data source. Therefore, this object is the only means of communication between them.
You store data in the NodeSpecifyingPlanContext
by getting a ParamWriter
object from the getWriter()
method. You then write parameters by calling methods on the ParamWriter
such as setString()
.
Note
ParamWriter
offers the ability to store only simple data types. For complex types, you must serialize the data in some manner and store it as a string or long string.
Creating sources
Vertica calls prepareUDSources()
on all hosts that the plan()
method selected to load data. This call instantiates and returns a list of UDSource
subclass instances. If you are not using concurrent load, return one UDSource
for each of the sources that the host is assigned to process. If you are using concurrent load, use the version of the method that takes an ExecutorPlanContext
as a parameter, and return as many sources as you can use. Your factory must implement exactly one of these methods.
Note
In the C++ API, the function that supports concurrent load is named prepareUDSourcesExecutor()
. In the Java API the class provides two overloads of prepareUDSources()
.
For concurrent load, you can find out how many threads are available on the node to run UDSource
instances by calling getLoadConcurrency()
on the ExecutorPlanContext
that is passed in.
Defining parameters
Implement getParameterTypes()
to define the names and types of parameters that your source uses. Vertica uses this information to warn callers about unknown or missing parameters. Vertica ignores unknown parameters and uses default values for missing parameters. While you should define the types and parameters for your function, you are not required to override this method.
Requesting threads for concurrent load
When a source factory creates sources on an executor node, by default, it creates one thread per source. If your sources can use multiple threads, implement getDesiredThreads()
. Vertica calls this method before it calls prepareUDSources()
, so you can also use it to decide how many sources to create. Return the number of threads your factory can use for sources. The maximum number of available threads is passed in, so you can take that into account. The value your method returns is a hint, not a guarantee; each executor node determines the number of threads to allocate. The FilePortionSourceFactory
example implements this method; see C++ example: concurrent load.
You can allow your source to have control over parallelism, meaning that it can divide a single input into multiple load streams, by implementing isSourceApportionable()
. Returning true
from this method does not guarantee that the source will apportion the load. However, returning false
indicates that it will not try to do so. See Apportioned load for more information.
Often, a SourceFactory
that implements getDesiredThreads()
also uses apportioned load. However, using apportioned load is not a requirement. A source reading from Kafka streams, for example, could use multiple threads without ssapportioning.
API
The SourceFactory API provides the following methods for extension by subclasses:
After creating your SourceFactory
, you must register it with the RegisterFactory
macro.
The SourceFactory API provides the following methods for extension by subclasses:
3 - C++ example: CurlSource
The CurlSource example allows you to use cURL to open and read in a file over HTTP.
The CurlSource example allows you to use cURL to open and read in a file over HTTP. The example provided is part of: /opt/vertica/sdk/examples/SourceFunctions/cURL.cpp
.
Source implementation
This example uses the helper library available in /opt/vertica/sdk/examples/HelperLibraries/
.
CurlSource loads the data in chunks. If the parser encounters an EndOfFile marker, then the process()
method returns DONE. Otherwise, the method returns OUTPUT_NEEDED and processes another chunk of data. The functions included in the helper library (such as url_fread()
and url_fopen()
) are based on examples that come with the libcurl library. For an example, see http://curl.haxx.se/libcurl/c/fopen.html.
The setup()
function opens a file handle and the destroy()
function closes it. Both use functions from the helper library.
Factory implementation
CurlSourceFactory
produces CurlSource
instances.
4 - C++ example: concurrent load
The FilePortionSource example demonstrates the use of concurrent load.
The FilePortionSource
example demonstrates the use of concurrent load. This example is a refinement of the FileSource
example. Each input file is divided into portions and distributed to FilePortionSource
instances. The source accepts a list of offsets at which to break the input into portions; if offsets are not provided, the source divides the input dynamically.
Concurrent load is handled in the factory, so this discussion focuses on FilePortionSourceFactory
. The full code for the example is located in /opt/vertica/sdk/examples/ApportionLoadFunctions
. The distribution also includes a Java version of this example.
Loading and using the example
Load and use the FilePortionSource
example as follows.
=> CREATE LIBRARY FilePortionLib AS '/home/dbadmin/FP.so';
=> CREATE SOURCE FilePortionSource AS LANGUAGE 'C++'
-> NAME 'FilePortionSourceFactory' LIBRARY FilePortionLib;
=> COPY t WITH SOURCE FilePortionSource(file='g1/*.dat', nodes='initiator,e0,e1', offsets = '0,380000,820000');
=> COPY t WITH SOURCE FilePortionSource(file='g2/*.dat', nodes='e0,e1,e2', local_min_portion_size = 2097152);
Implementation
Concurrent load affects the source factory in two places, getDesiredThreads()
and prepareUDSourcesExecutor()
.
getDesiredThreads()
The getDesiredThreads()
member function determines the number of threads to request. Vertica calls this member function on each executor node before calling prepareUDSourcesExecutor()
.
The function begins by breaking an input file path, which might be a glob, into individual paths. This discussion omits those details. If apportioned load is not being used, then the function allocates one source per file.
If the source can be apportioned, then getDesiredThreads()
uses the offsets that were passed as arguments to the factory to divide the file into portions. It then allocates portions to available nodes. This function does not actually assign sources directly; this work is done to determine how many threads to request.
The function now has all the portions and thus the number of portions:
If offsets were not provided, the function divides the file into portions dynamically, one portion per thread. This discussion omits the details of this computation. There is no point in requesting more threads than are available, so the function calls getMaxAllowedThreads()
on the PlanContext
(an argument to the function) to set an upper bound:
See the full example for the details of how this function divides the file into portions.
This function uses the vt_createFuncObject
template to create objects. Vertica calls the destructors of returned objects created using this macro, but it does not call destructors for other objects like vectors. You must call these destructors yourself to avoid memory leaks. In this example, these calls are made in prepareUDSourcesExecutor()
.
prepareUDSourcesExecutor()
The prepareUDSourcesExecutor()
member function, like getDesiredThreads()
, has separate blocks of code depending on whether offsets are provided. In both cases, the function breaks input into portions and creates UDSource
instances for them.
If the function is called with offsets, prepareUDSourcesExecutor()
calls prepareCustomizedPortions()
. This function follows.
If prepareUDSourcesExecutor()
is called without offsets, then it must decide how many portions to create.
The base case is to use one portion per source. However, if extra threads are available, the function divides the input into more portions so that a source can process them concurrently. Then prepareUDSourcesExecutor()
calls prepareGeneratedPortions()
to create the portions. This function begins by calling getLoadConcurrency()
on the plan context to find out how many threads are available.
See the source code for the full implementation of this example.
5 - Java example: FileSource
The example shown in this section is a simple UDL Source function named FileSource, This function loads data from files stored on the host's file system (similar to the standard COPY statement).
The example shown in this section is a simple UDL Source function named FileSource
, This function loads data from files stored on the host's file system (similar to the standard COPY statement). To call FileSource
, you must supply a parameter named file
that contains the absolute path to one or more files on the host file system. You can specify multiple files as a comma-separated list.
The FileSource
function also accepts an optional parameter, named nodes
, that indicates which nodes should load the files. If you do not supply this parameter, the function defaults to loading data on the initiator node only. Because this example is simple, the nodes load only the files from their own file systems. Any files in the file parameter must exist on all of the hosts in the nodes parameter. The FileSource
UDSource attempts to load all of the files in the file
parameter on all of the hosts in the nodes
parameter.
Generating files
You can use the following Python script to generate files and distribute them to hosts in your Vertica cluster. With these files, you can experiment with the example UDSource
function. Running the function requires passwordless-SSH logins to copy the files to the other hosts. Therefore, you must run the script using the database administrator account on one of your database hosts.
You call this script by giving it a comma-separated list of hosts to receive the files and a comma-separated list of absolute paths of files to generate. For example:
This script generates files that contain a thousand rows of columns delimited with the pipe character (|). These columns contain an index value, a set of random words, and the node for which the file was generated, as shown in the following output sample:
0|megabits embanks|v_vmart_node0001
1|unneatly|v_vmart_node0001
2|self-precipitation|v_vmart_node0001
3|antihistamine scalados Vatter|v_vmart_node0001
Loading and using the example
Load and use the FileSource
UDSource as follows:
Parser implementation
The following code shows the source of the FileSource
class that reads a file from the host file system. The constructor, which is called by FileSourceFactory.prepareUDSources()
, gets the absolute path for the file containing the data to be read. The setup()
method opens the file and the destroy()
method closes it. The process()
method reads from the file into a buffer provided by the instance of the DataBuffer
class passed to it as a parameter. If the read operation filled the output buffer, it returns OUTPUT_NEEDED
. This value tells Vertica to call the method again after the next stage of the load has processed the output buffer. If the read did not fill the output buffer, then process()
returns DONE to indicate it has finished processing the data source.
Factory implementation
The following code is a modified version of the example Java UDsource function provided in the Java UDx support package. You can find the full example in /opt/vertica/sdk/examples/JavaUDx/UDLFuctions/com/vertica/JavaLibs/FileSourceFactory.java
. Its override of the plan()
method verifies that the user supplied the required file
parameter. If the user also supplied the optional nodes parameter, this method verifies that the nodes exist in the Vertica cluster. If there is a problem with either parameter, the method throws an exception to return an error to the user. If there are no issues with the parameters, the plan()
method stores their values in the plan context object.