UDParser class

You can subclass the UDParser class when you need to parse data that is in a format that the COPY statement's native parser cannot handle.

You can subclass the UDParser class when you need to parse data that is in a format that the COPY statement's native parser cannot handle.

During parser execution, Vertica always calls three methods: setup(), process(), and destroy(). It might also call getRejectedRecord().

UDParser constructor

The UDParser class performs important initialization required by all subclasses, including initializing the StreamWriter object used by the parser. Therefore, your constructor must call super().

UDParser methods

Your UDParser subclass must override process() or processWithMetadata():

  • process() reads the raw input stream as one large file. If there are any errors or failures, the entire load fails. You can implement process() with a source or filter that implements processWithMetadata(), but it might result in parsing errors.
    You can implement process() when the upstream source or filter implements processWithMetadata(), but it might result in parsing errors.

  • processWithMetadata() is useful when the data source has metadata about record boundaries available in some structured format that's separate from the data payload. With this interface, the source emits a record length for each record in addition to the data.

    By implementing processWithMetadata() instead of process() in each phase, you can retain this record length metadata throughout the load stack, which enables a more efficient parse that can recover from errors on a per-message basis, rather than a per-file or per-source basis. KafkaSource and Kafka parsers (KafkaAvroParser, KafkaJSONParser, and KafkaParser) use this mechanism to support per-Kafka-message rejections when individual Kafka messages are corrupted.

Additionally, you must override getRejectedRecord() to return information about rejected records.

Optionally, you can override the other UDParser class methods.

Parser execution

The following sections detail the execution sequence each time a user-defined parser is called. The following example overrides the process() method.

Setting up

COPY calls setup() before the first time it calls process(). Use setup() to perform any initial setup tasks that your parser needs to parse data. This setup includes retrieving parameters from the class context structure or initializing data structures for use during filtering. Vertica calls this method before calling the process() method for the first time. Your object might be destroyed and re-created during use, so make sure that your object is restartable.

Parsing

COPY calls process() repeatedly during query execution. Vertica passes this method a buffer of data to parse into columns and rows and one of the following input states defined by InputState:

  • OK: currently at the start of or in the middle of a stream

  • END_OF_FILE: no further data is available.

  • END_OF_CHUNK: the current data ends on a record boundary and the parser should consume all of it before returning. This input state only occurs when using a chunker.

  • START_OF_PORTION: the input does not start at the beginning of a source. The parser should find the first end-of-record mark. This input state only occurs when using apportioned load.You can use the getPortion() method to access the offset and size of the portion.

  • END_OF_PORTION: the source has reached the end of its portion. The parser should finish processing the last record it started and advance no further. This input state only occurs when using apportioned load.

The parser must reject any data that it cannot parse, so that Vertica can report the rejection and write the rejected data to files.

The process() method must parse as much data as it can from the input buffer. The buffer might not end on a row boundary. Therefore, it might have to stop parsing in the middle of a row of input and ask for more data. The input can contain null bytes, if the source file contains them, and is not automatically null-terminated.

A parser has an associated StreamWriter object, which performs the actual writing of the data. When your parser extracts a column value, it uses one of the type-specific methods on StreamWriter to write that value to the output stream. See Writing Data for more information about these methods.

A single call to process() might write several rows of data. When your parser finishes processing a row of data, it must call next() on its StreamWriter to advance the output stream to a new row. (Usually a parser finishes processing a row because it encounters an end-of-row marker.)

When your process() method reaches the end of the buffer, it tells Vertica its current state by returning one of the following values defined by StreamState:

  • INPUT_NEEDED: the parser has reached the end of the buffer and needs more data to parse.

  • DONE: the parser has reached the end of the input data stream.

  • REJECT: the parser has rejected the last row of data it read (see Rejecting Rows).

Tearing down

COPY calls destroy() after the last time that process() is called. It frees any resources reserved by the setup() or process() method.

Vertica calls this method after the process() method indicates it has completed parsing the data source. However, sometimes data sources that have not yet been processed might remain. In such cases, Vertica might later call setup() on the object again and have it parse the data in a new data stream. Therefore, write your destroy() method so that it leaves an instance of your UDParser subclass in a state where setup() can be safely called again.

Reporting rejections

If process() rejects a row, Vertica calls getRejectedRecord() to report it. Usually, this method returns an instance of the RejectedRecord class with details of the rejected row.

Writing data

A parser has an associated StreamWriter object, which you access by calling getStreamWriter(). In your process() implementation, use the setType() methods on the StreamWriter object to write values in a row to specific column indexes. Verify that the data types you write match the data types expected by the schema.

The following example shows how you can write a value of type long to the fourth column (index 3) in the current row:

StreamWriter writer = getStreamWriter();
...
writer.setLongValue(3, 98.6);

StreamWriter provides methods for all the basic types, such as setBooleanValue(), setStringValue(), and so on. See the API documentation for a complete list of StreamWriter methods, including options that take primitive types or explicitly set entries to null.

Rejecting rows

If your parser finds data it cannot parse, it should reject the row by:

  1. Saving details about the rejected row data and the reason for the rejection. These pieces of information can be directly stored in a RejectedRecord object, or in fields on your UDParser subclass, until they are needed.

  2. Updating the row's position in the input buffer by updating input.offset so it can resume parsing with the next row.

  3. Signaling that it has rejected a row by returning with the value StreamState.REJECT.

  4. Returning an instance of the RejectedRecord class with the details about the rejected row.

Breaking up large loads

Vertica provides two ways to break up large loads. Apportioned load allows you to distribute a load among several database nodes. Cooperative parse (C++ only) allows you to distribute a load among several threads on one node.

API

The UDParser API provides the following methods for extension by subclasses:

virtual void setup(ServerInterface &srvInterface, SizedColumnTypes &returnType);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input, InputState input_state)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &input,
                            LengthBuffer &input_lengths, InputState input_state)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface, SizedColumnTypes &returnType);

virtual RejectedRecord getRejectedRecord();

The UDParser API provides the following methods for extension by subclasses:

public void setup(ServerInterface srvInterface, SizedColumnTypes returnType)
    throws UdfException;

public abstract StreamState process(ServerInterface srvInterface,
                DataBuffer input, InputState input_state)
    throws UdfException, DestroyInvocation;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface, SizedColumnTypes returnType)
    throws UdfException;

public RejectedRecord getRejectedRecord() throws UdfException;

A UDParser uses a StreamWriter to write its output. StreamWriter provides methods for all the basic types, such as setBooleanValue(), setStringValue(), and so on. In the Java API this class also provides the setValue() method, which automatically sets the data type.

The methods described so far write single column values. StreamWriter also provides a method to write a complete row from a map. The setRowFromMap() method takes a map of column names and values and writes all the values into their corresponding columns. This method does not define new columns but instead writes values only to existing columns. The JsonParser example uses this method to write arbitrary JSON input. (See Java example: JSON parser.)

setRowsFromMap() also populates any VMap ('raw') column of Flex Tables (see Flex tables) with the entire provided map. For most cases, setRowsFromMap() is the appropriate way to populate a Flex Table. However, you can also generate a VMap value into a specified column using setVMap(), similar to other setValue() methods.

The setRowFromMap() method automatically coerces the input values into the types defined for those columns using an associated TypeCoercion. In most cases, using the default implementation (StandardTypeCoercion) is appropriate.

TypeCoercion uses policies to govern its behavior. For example, the FAIL_INVALID_INPUT_VALUE policy means invalid input is treated as an error instead of using a null value. Errors are caught and handled as rejections (see "Rejecting Rows" in User-defined parser). Policies also govern whether input that is too long is truncated. Use the setPolicy() method on the parser's TypeCoercion to set policies. See the API documentation for supported values.

You might need to customize type coercion beyond setting these policies. To do so, subclass one of the provided implementations of TypeCoercion and override the asType() methods. Such customization could be necessary if your parser reads objects that come from a third-party library. A parser handling geo-coordinates, for example, might override asLong to translate inputs like "40.4397N" into numbers. See the Vertica API documentation for a list of implementations.

The UDParser API provides the following methods for extension by subclasses:


class PyUDParser(vertica_sdk.UDSource):
    def __init__(self):
        pass
    def setup(srvInterface, returnType):
        pass
    def process(self, srvInterface, inputbuffer, inputstate, streamwriter):
        # User implement process function.
        # User reads data from inputbuffer and parse the data.
        # Rows are emitted via the streamwriter argument
        return StreamState.DONE

In Python, the process() method requires both an input buffer and an output buffer (see InputBuffer and OutputBuffer APIs). The input buffer represents the source of the information that you want to parse. The output buffer delivers the filtered information to Vertica.

In the event the filter rejects a record, use the method REJECT() to identify the rejected data and the reason for the rejection.