This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

User-defined parser

A parser takes a stream of bytes and passes a corresponding sequence of tuples to the Vertica load process.

A parser takes a stream of bytes and passes a corresponding sequence of tuples to the Vertica load process. You can use user-defined parser functions to parse:

  • Data in formats not understood by the Vertica built-in parser.

  • Data that requires more specific control than the built-in parser supplies.

For example, you can load a CSV file using a specific CSV library. See the Vertica SDK for two CSV examples.

COPY supports a single user-defined parser that you can use with a user-defined source and zero or more instances of a user-defined filter. If you implement a UDParser class, you must also implement a corresponding ParserFactory.

Sometimes, you can improve the performance of your parser by adding a chunker. A chunker divides up the input and uses multiple threads to parse it. Chunkers are available only in the C++ API. For details, see Cooperative parse and UDChunker class. Under special circumstances you can further improve performance by using apportioned load, an approach where multiple Vertica nodes parse the input.

1 - UDParser class

You can subclass the UDParser class when you need to parse data that is in a format that the COPY statement's native parser cannot handle.

You can subclass the UDParser class when you need to parse data that is in a format that the COPY statement's native parser cannot handle.

During parser execution, Vertica always calls three methods: setup(), process(), and destroy(). It might also call getRejectedRecord().

UDParser constructor

The UDParser class performs important initialization required by all subclasses, including initializing the StreamWriter object used by the parser. Therefore, your constructor must call super().

UDParser methods

Your UDParser subclass must override process() or processWithMetadata():

  • process() reads the raw input stream as one large file. If there are any errors or failures, the entire load fails. You can implement process() with a source or filter that implements processWithMetadata(), but it might result in parsing errors.
    You can implement process() when the upstream source or filter implements processWithMetadata(), but it might result in parsing errors.

  • processWithMetadata() is useful when the data source has metadata about record boundaries available in some structured format that's separate from the data payload. With this interface, the source emits a record length for each record in addition to the data.

    By implementing processWithMetadata() instead of process() in each phase, you can retain this record length metadata throughout the load stack, which enables a more efficient parse that can recover from errors on a per-message basis, rather than a per-file or per-source basis. KafkaSource and Kafka parsers (KafkaAvroParser, KafkaJSONParser, and KafkaParser) use this mechanism to support per-Kafka-message rejections when individual Kafka messages are corrupted.

Additionally, you must override getRejectedRecord() to return information about rejected records.

Optionally, you can override the other UDParser class methods.

Parser execution

The following sections detail the execution sequence each time a user-defined parser is called. The following example overrides the process() method.

Setting up

COPY calls setup() before the first time it calls process(). Use setup() to perform any initial setup tasks that your parser needs to parse data. This setup includes retrieving parameters from the class context structure or initializing data structures for use during filtering. Vertica calls this method before calling the process() method for the first time. Your object might be destroyed and re-created during use, so make sure that your object is restartable.

Parsing

COPY calls process() repeatedly during query execution. Vertica passes this method a buffer of data to parse into columns and rows and one of the following input states defined by InputState:

  • OK: currently at the start of or in the middle of a stream

  • END_OF_FILE: no further data is available.

  • END_OF_CHUNK: the current data ends on a record boundary and the parser should consume all of it before returning. This input state only occurs when using a chunker.

  • START_OF_PORTION: the input does not start at the beginning of a source. The parser should find the first end-of-record mark. This input state only occurs when using apportioned load.You can use the getPortion() method to access the offset and size of the portion.

  • END_OF_PORTION: the source has reached the end of its portion. The parser should finish processing the last record it started and advance no further. This input state only occurs when using apportioned load.

The parser must reject any data that it cannot parse, so that Vertica can report the rejection and write the rejected data to files.

The process() method must parse as much data as it can from the input buffer. The buffer might not end on a row boundary. Therefore, it might have to stop parsing in the middle of a row of input and ask for more data. The input can contain null bytes, if the source file contains them, and is not automatically null-terminated.

A parser has an associated StreamWriter object, which performs the actual writing of the data. When your parser extracts a column value, it uses one of the type-specific methods on StreamWriter to write that value to the output stream. See Writing Data for more information about these methods.

A single call to process() might write several rows of data. When your parser finishes processing a row of data, it must call next() on its StreamWriter to advance the output stream to a new row. (Usually a parser finishes processing a row because it encounters an end-of-row marker.)

When your process() method reaches the end of the buffer, it tells Vertica its current state by returning one of the following values defined by StreamState:

  • INPUT_NEEDED: the parser has reached the end of the buffer and needs more data to parse.

  • DONE: the parser has reached the end of the input data stream.

  • REJECT: the parser has rejected the last row of data it read (see Rejecting Rows).

Tearing down

COPY calls destroy() after the last time that process() is called. It frees any resources reserved by the setup() or process() method.

Vertica calls this method after the process() method indicates it has completed parsing the data source. However, sometimes data sources that have not yet been processed might remain. In such cases, Vertica might later call setup() on the object again and have it parse the data in a new data stream. Therefore, write your destroy() method so that it leaves an instance of your UDParser subclass in a state where setup() can be safely called again.

Reporting rejections

If process() rejects a row, Vertica calls getRejectedRecord() to report it. Usually, this method returns an instance of the RejectedRecord class with details of the rejected row.

Writing data

A parser has an associated StreamWriter object, which you access by calling getStreamWriter(). In your process() implementation, use the setType() methods on the StreamWriter object to write values in a row to specific column indexes. Verify that the data types you write match the data types expected by the schema.

The following example shows how you can write a value of type long to the fourth column (index 3) in the current row:

StreamWriter writer = getStreamWriter();
...
writer.setLongValue(3, 98.6);

StreamWriter provides methods for all the basic types, such as setBooleanValue(), setStringValue(), and so on. See the API documentation for a complete list of StreamWriter methods, including options that take primitive types or explicitly set entries to null.

Rejecting rows

If your parser finds data it cannot parse, it should reject the row by:

  1. Saving details about the rejected row data and the reason for the rejection. These pieces of information can be directly stored in a RejectedRecord object, or in fields on your UDParser subclass, until they are needed.

  2. Updating the row's position in the input buffer by updating input.offset so it can resume parsing with the next row.

  3. Signaling that it has rejected a row by returning with the value StreamState.REJECT.

  4. Returning an instance of the RejectedRecord class with the details about the rejected row.

Breaking up large loads

Vertica provides two ways to break up large loads. Apportioned load allows you to distribute a load among several database nodes. Cooperative parse (C++ only) allows you to distribute a load among several threads on one node.

API

The UDParser API provides the following methods for extension by subclasses:

virtual void setup(ServerInterface &srvInterface, SizedColumnTypes &returnType);

virtual bool useSideChannel();

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input, InputState input_state)=0;

virtual StreamState processWithMetadata(ServerInterface &srvInterface, DataBuffer &input,
                            LengthBuffer &input_lengths, InputState input_state)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface, SizedColumnTypes &returnType);

virtual RejectedRecord getRejectedRecord();

The UDParser API provides the following methods for extension by subclasses:

public void setup(ServerInterface srvInterface, SizedColumnTypes returnType)
    throws UdfException;

public abstract StreamState process(ServerInterface srvInterface,
                DataBuffer input, InputState input_state)
    throws UdfException, DestroyInvocation;

protected void cancel(ServerInterface srvInterface);

public void destroy(ServerInterface srvInterface, SizedColumnTypes returnType)
    throws UdfException;

public RejectedRecord getRejectedRecord() throws UdfException;

A UDParser uses a StreamWriter to write its output. StreamWriter provides methods for all the basic types, such as setBooleanValue(), setStringValue(), and so on. In the Java API this class also provides the setValue() method, which automatically sets the data type.

The methods described so far write single column values. StreamWriter also provides a method to write a complete row from a map. The setRowFromMap() method takes a map of column names and values and writes all the values into their corresponding columns. This method does not define new columns but instead writes values only to existing columns. The JsonParser example uses this method to write arbitrary JSON input. (See Java example: JSON parser.)

setRowsFromMap() also populates any VMap ('raw') column of Flex Tables (see Flex tables) with the entire provided map. For most cases, setRowsFromMap() is the appropriate way to populate a Flex Table. However, you can also generate a VMap value into a specified column using setVMap(), similar to other setValue() methods.

The setRowFromMap() method automatically coerces the input values into the types defined for those columns using an associated TypeCoercion. In most cases, using the default implementation (StandardTypeCoercion) is appropriate.

TypeCoercion uses policies to govern its behavior. For example, the FAIL_INVALID_INPUT_VALUE policy means invalid input is treated as an error instead of using a null value. Errors are caught and handled as rejections (see "Rejecting Rows" in User-defined parser). Policies also govern whether input that is too long is truncated. Use the setPolicy() method on the parser's TypeCoercion to set policies. See the API documentation for supported values.

You might need to customize type coercion beyond setting these policies. To do so, subclass one of the provided implementations of TypeCoercion and override the asType() methods. Such customization could be necessary if your parser reads objects that come from a third-party library. A parser handling geo-coordinates, for example, might override asLong to translate inputs like "40.4397N" into numbers. See the Vertica API documentation for a list of implementations.

The UDParser API provides the following methods for extension by subclasses:


class PyUDParser(vertica_sdk.UDSource):
    def __init__(self):
        pass
    def setup(srvInterface, returnType):
        pass
    def process(self, srvInterface, inputbuffer, inputstate, streamwriter):
        # User implement process function.
        # User reads data from inputbuffer and parse the data.
        # Rows are emitted via the streamwriter argument
        return StreamState.DONE

In Python, the process() method requires both an input buffer and an output buffer (see InputBuffer and OutputBuffer APIs). The input buffer represents the source of the information that you want to parse. The output buffer delivers the filtered information to Vertica.

In the event the filter rejects a record, use the method REJECT() to identify the rejected data and the reason for the rejection.

2 - UDChunker class

You can subclass the UDChunker class to allow your parser to support Cooperative Parse.

You can subclass the UDChunker class to allow your parser to support Cooperative parse. This class is available only in the C++ API.

Fundamentally, a UDChunker is a very simplistic parser. Like UDParser, it has the following three methods: setup(), process(), and destroy(). You must override process(); you may override the others. This class has one additional method, alignPortion(), which you must implement if you want to enable Apportioned load for your UDChunker.

Setting up and tearing down

As with UDParser, you can define initialization and cleanup code for your chunker. Vertica calls setup() before the first call to process() and destroy() after the last call to process(). Your object might be reused among multiple load sources, so make sure that setup() completely initializes all fields.

Chunking

Vertica calls process() to divide an input into chunks that can be parsed independently. The method takes an input buffer and an indicator of the input state:

  • OK: the input buffer begins at the start of or in the middle of a stream.

  • END_OF_FILE: no further data is available.

  • END_OF_PORTION: the source has reached the end of its portion. This state occurs only when using apportioned load.

If the input state is END_OF_FILE, the chunker should set the input.offset marker to input.size and return DONE. Returning INPUT_NEEDED is an error.

If the input state is OK, the chunker should read data from the input buffer and find record boundaries. If it finds the end of at least one record, it should align the input.offset marker with the byte after the end of the last record in the buffer and return CHUNK_ALIGNED. For example, if the input is "abc~def" and "~" is a record terminator, this method should set input.offset to 4, the position of "d". If process() reaches the end of the input without finding a record boundary, it should return INPUT_NEEDED.

You can divide the input into smaller chunks, but consuming all available records in the input can have better performance. For example, a chunker could scan backwards from the end of the input to find a record terminator, which might be the last of many records in the input, and return it all as one chunk without scanning through the rest of the input.

If the input state is END_OF_PORTION, the chunker should behave as it does for an input state of OK, except that it should also set a flag. When called again, it should find the first record in the next portion and align the chunk to that record.

The input data can contain null bytes, if the source file contains them. The input argument is not automatically null-terminated.

The process() method must not block indefinitely. If this method cannot proceed for an extended period of time, it should return KEEP_GOING. Failing to return KEEP_GOING has several consequences, such as preventing your user from being able to cancel the query.

See C++ example: delimited parser and chunker for an example of the process() method using chunking.

Aligning portions

If your chunker supports apportioned load, implement the alignPortion() method. Vertica calls this method one or more times, before calling process(), to align the input offset with the beginning of the first complete chunk in the portion. The method takes an input buffer and an indicator of the input state:

  • START_OF_PORTION: the beginning of the buffer corresponds to the start of the portion. You can use the getPortion() method to access the offset and size of the portion.

  • OK: the input buffer is in the middle of a portion.

  • END_OF_PORTION: the end of the buffer corresponds to the end of the portion or beyond the end of a portion.

  • END_OF_FILE: no further data is available.

The method should scan from the beginning of the buffer to the start of the first complete record. It should set input.offset to this position and return one of the following values:

  • DONE, if it found a chunk. input.offset is the first byte of the chunk.

  • INPUT_NEEDED, if the input buffer does not contain the start of any chunk. It is an error to return this from an input state of END_OF_FILE.

  • REJECT, if the portion (not buffer) does not contain the start of any chunk.

API

The UDChunker API provides the following methods for extension by subclasses:

virtual void setup(ServerInterface &srvInterface,
            SizedColumnTypes &returnType);

virtual StreamState alignPortion(ServerInterface &srvInterface,
            DataBuffer &input, InputState state);

virtual StreamState process(ServerInterface &srvInterface,
            DataBuffer &input, InputState input_state)=0;

virtual void cancel(ServerInterface &srvInterface);

virtual void destroy(ServerInterface &srvInterface,
            SizedColumnTypes &returnType);

3 - ParserFactory class

If you write a parser, you must also write a factory to produce parser instances.

If you write a parser, you must also write a factory to produce parser instances. To do so, subclass the ParserFactory class.

Parser factories are singletons. Your subclass must be stateless, with no fields containing data. Your subclass also must not modify any global variables.

The ParserFactory class defines the following methods. Your subclass must override the prepare() method. It may override the other methods.

Setting up

Vertica calls plan() once on the initiator node to perform the following tasks:

  • Check any parameters that have been passed from the function call in the COPY statement and error messages if there are any issues. You read the parameters by getting a ParamReader object from the instance of ServerInterface passed into your plan() method.

  • Store any information that the individual hosts need in order to parse the data. For example, you could store parameters in the PlanContext instance passed in through the planCtxt parameter. The plan() method runs only on the initiator node, and the prepareUDSources() method runs on each host reading from a data source. Therefore, this object is the only means of communication between them.

    You store data in the PlanContext by getting a ParamWriter object from the getWriter() method. You then write parameters by calling methods on the ParamWriter such as setString.

Creating parsers

Vertica calls prepare() on each node to create and initialize your parser, using data stored by the plan() method.

Defining parameters

Implement getParameterTypes() to define the names and types of parameters that your parser uses. Vertica uses this information to warn callers about unknown or missing parameters. Vertica ignores unknown parameters and uses default values for missing parameters. While you should define the types and parameters for your function, you are not required to override this method.

Defining parser outputs

Implement getParserReturnType() to define the data types of the table columns that the parser outputs. If applicable, getParserReturnType() also defines the size, precision, or scale of the data types. Usually, this method reads data types of the output table from the argType and perColumnParamReader arguments and verifies that it can output the appropriate data types. If getParserReturnType() is prepared to output the data types, it calls methods on the SizedColumnTypes object passed in the returnType argument. In addition to the data type of the output column, your method should also specify any additional information about the column's data type:

  • For binary and string data types (such as CHAR, VARCHAR, and LONG VARBINARY), specify its maximum length.

  • For NUMERIC types, specify its precision and scale.

  • For Time/Timestamp types (with or without time zone), specify its precision (-1 means unspecified).

  • For ARRAY types, specify the maximum number of elements.

  • For all other types, no length or precision specification is required.

Supporting cooperative parse

To support Cooperative parse, implement prepareChunker() and return an instance of your UDChunker subclass. If isChunkerApportionable() returns true, then it is an error for this method to return null.

Cooperative parse is currently supported only in the C++ API.

Supporting apportioned load

To support Apportioned load, your parser, chunker, or both must support apportioning. To indicate that the parser can apportion a load, implement isParserApportionable() and return true. To indicate that the chunker can apportion a load, implement isChunkerApportionable() and return true.

The isChunkerApportionable() method takes a ServerInterface as an argument, so you have access to the parameters supplied in the COPY statement. You might need this information if the user can specify a record delimiter, for example. Return true from this method if and only if the factory can create a chunker for this input.

API

The ParserFactory API provides the following methods for extension by subclasses:

virtual void plan(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader, PlanContext &planCtxt);

virtual UDParser * prepare(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt, const SizedColumnTypes &returnType)=0;

virtual void getParameterType(ServerInterface &srvInterface, SizedColumnTypes &parameterTypes);

virtual void getParserReturnType(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt, const SizedColumnTypes &argTypes,
            SizedColumnTypes &returnType);

virtual bool isParserApportionable();

// C++ API only:
virtual bool isChunkerApportionable(ServerInterface &srvInterface);

virtual UDChunker * prepareChunker(ServerInterface &srvInterface, PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt, const SizedColumnTypes &returnType);

If you are using Apportioned load to divide a single input into multiple load streams, implement isParserApportionable() and/or isChunkerApportionable() and return true. Returning true from these methods does not guarantee that Verticawill apportion the load. However, returning false from both indicates that it will not try to do so.

If you are using Cooperative parse, implement prepareChunker() and return an instance of your UDChunker subclass. Cooperative parse is supported only for the C++ API.

Vertica calls the prepareChunker() method only for unfenced functions. This method is not available when you use the function in fenced mode.

If you want your chunker to be available for apportioned load, implement isChunkerApportionable() and return true.

After creating your ParserFactory, you must register it with the RegisterFactory macro.

The ParserFactory API provides the following methods for extension by subclasses:

public void plan(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader, PlanContext planCtxt)
    throws UdfException;

public abstract UDParser prepare(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt, SizedColumnTypes returnType)
    throws UdfException;

public void getParameterType(ServerInterface srvInterface, SizedColumnTypes parameterTypes);

public void getParserReturnType(ServerInterface srvInterface, PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt, SizedColumnTypes argTypes, SizedColumnTypes returnType)
    throws UdfException;

The ParserFactory API provides the following methods for extension by subclasses:

class PyParserFactory(vertica_sdk.SourceFactory):
    def __init__(self):
        pass
    def plan(self):
        pass
    def prepareUDSources(self, srvInterface):
        # User implement the function to create PyUDParser.
        pass

4 - C++ example: BasicIntegerParser

The BasicIntegerParser example parses a string of integers separated by non-numeric characters.

The BasicIntegerParser example parses a string of integers separated by non-numeric characters. For a version of this parser that uses continuous load, see C++ example: ContinuousIntegerParser.

Loading and using the example

Load and use the BasicIntegerParser example as follows.

=> CREATE LIBRARY BasicIntegerParserLib AS '/home/dbadmin/BIP.so';

=> CREATE PARSER BasicIntegerParser AS
LANGUAGE 'C++' NAME 'BasicIntegerParserFactory' LIBRARY BasicIntegerParserLib;

=> CREATE TABLE t (i integer);

=> COPY t FROM stdin WITH PARSER BasicIntegerParser();
0
1
2
3
4
5
\.

Implementation

The BasicIntegerParser class implements only the process() method from the API. (It also implements a helper method for type conversion.) This method processes each line of input, looking for numbers on each line. When it advances to a new line it moves the input.offset marker and checks the input state. It then writes the output.

    virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input,
                InputState input_state) {
        // WARNING: This implementation is not trying for efficiency.
        // It is trying for simplicity, for demonstration purposes.

        size_t start = input.offset;
        const size_t end = input.size;

        do {
            bool found_newline = false;
            size_t numEnd = start;
            for (; numEnd < end; numEnd++) {
                if (input.buf[numEnd] < '0' || input.buf[numEnd] > '9') {
                    found_newline = true;
                    break;
                }
            }

            if (!found_newline) {
                input.offset = start;
                if (input_state == END_OF_FILE) {
                    // If we're at end-of-file,
                    // emit the last integer (if any) and return DONE.
                    if (start != end) {
                        writer->setInt(0, strToInt(input.buf + start, input.buf + numEnd));
                        writer->next();
                    }
                    return DONE;
                } else {
                    // Otherwise, we need more data.
                    return INPUT_NEEDED;
                }
            }

            writer->setInt(0, strToInt(input.buf + start, input.buf + numEnd));
            writer->next();

            start = numEnd + 1;
        } while (true);
    }
};

In the factory, the plan() method is a no-op; there are no parameters to check. The prepare() method instantiates the parser using the macro provided by the SDK:

    virtual UDParser* prepare(ServerInterface &srvInterface,
            PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt,
            const SizedColumnTypes &returnType) {

        return vt_createFuncObject<BasicIntegerParser>(srvInterface.allocator);
    }

The getParserReturnType() method declares the single output:

    virtual void getParserReturnType(ServerInterface &srvInterface,
            PerColumnParamReader &perColumnParamReader,
            PlanContext &planCtxt,
            const SizedColumnTypes &argTypes,
            SizedColumnTypes &returnType) {
        // We only and always have a single integer column
        returnType.addInt(argTypes.getColumnName(0));
    }

As for all UDxs written in C++, the example ends by registering its factory:

RegisterFactory(BasicIntegerParserFactory);

5 - C++ example: ContinuousIntegerParser

The ContinuousIntegerParser example is a variation of BasicIntegerParser.

The ContinuousIntegerParser example is a variation of BasicIntegerParser. Both examples parse integers from input strings. ContinuousIntegerParser uses Continuous load to read data.

Loading and using the example

Load the ContinuousIntegerParser example as follows.

=> CREATE LIBRARY ContinuousIntegerParserLib AS '/home/dbadmin/CIP.so';

=> CREATE PARSER ContinuousIntegerParser AS
LANGUAGE 'C++' NAME 'ContinuousIntegerParserFactory'
LIBRARY ContinuousIntegerParserLib;

Use it in the same way that you use BasicIntegerParser. See C++ example: BasicIntegerParser.

Implementation

ContinuousIntegerParser is a subclass of ContinuousUDParser. Subclasses of ContinuousUDParser place the processing logic in the run() method.

    virtual void run() {

        // This parser assumes a single-column input, and
        // a stream of ASCII integers split by non-numeric characters.
        size_t pos = 0;
        size_t reserved = cr.reserve(pos+1);
        while (!cr.isEof() || reserved == pos + 1) {
            while (reserved == pos + 1 && isdigit(*ptr(pos))) {
                pos++;
                reserved = cr.reserve(pos + 1);
            }

            std::string st(ptr(), pos);
            writer->setInt(0, strToInt(st));
            writer->next();

            while (reserved == pos + 1 && !isdigit(*ptr(pos))) {
                pos++;
                reserved = cr.reserve(pos + 1);
            }
            cr.seek(pos);
            pos = 0;
            reserved = cr.reserve(pos + 1);
        }
    }
};

For a more complex example of a ContinuousUDParser, see ExampleDelimitedParser in the examples. (See Downloading and running UDx example code.) ExampleDelimitedParser uses a chunker; see C++ example: delimited parser and chunker.

6 - Java example: numeric text

This NumericTextParser example parses integer values spelled out in words rather than digits (for example "one two three" for one-hundred twenty three).

This NumericTextParser example parses integer values spelled out in words rather than digits (for example "one two three" for one-hundred twenty three). The parser:

  • Accepts a single parameter to set the character that separates columns in a row of data. The separator defaults to the pipe (|) character.

  • Ignores extra spaces and the capitalization of the words used to spell out the digits.

  • Recognizes the digits using the following words: zero, one, two, three, four, five, six, seven, eight, nine.

  • Assumes that the words spelling out an integer are separated by at least one space.

  • Rejects any row of data that cannot be completely parsed into integers.

  • Generates an error, if the output table has a non-integer column.

Loading and using the example

Load and use the parser as follows:

=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaLib.jar' LANGUAGE 'JAVA';
CREATE LIBRARY

=> CREATE PARSER NumericTextParser AS LANGUAGE 'java'
->    NAME 'com.myCompany.UDParser.NumericTextParserFactory'
->    LIBRARY JavaLib;
CREATE PARSER FUNCTION
=> CREATE TABLE t (i INTEGER);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One
>> Two
>> One Two Three
>> \.
=> SELECT * FROM t ORDER BY i;
  i
-----
   1
   2
 123
(3 rows)

=> DROP TABLE t;
DROP TABLE
=> -- Parse multi-column input
=> CREATE TABLE t (i INTEGER, j INTEGER);
CREATE TABLE
=> COPY t FROM stdin WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One | Two
>> Two | Three
>> One Two Three | four Five Six
>> \.
=> SELECT * FROM t ORDER BY i;
  i  |  j
-----+-----
   1 |   2
   2 |   3
 123 | 456
(3 rows)

=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> -- Use alternate separator character
=> COPY t FROM STDIN WITH PARSER NumericTextParser(separator='*');
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> Five * Six
>> seven * eight
>> nine * one zero
>> \.
=> SELECT * FROM t ORDER BY i;
 i | j
---+----
 5 |  6
 7 |  8
 9 | 10
(3 rows)

=> TRUNCATE TABLE t;
TRUNCATE TABLE

=> -- Rows containing data that does not parse into digits is rejected.
=> DROP TABLE t;
DROP TABLE
=> CREATE TABLE t (i INTEGER);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One Zero Zero
>> Two Zero Zero
>> Three Zed Zed
>> Four Zero Zero
>> Five Zed Zed
>> \.
SELECT * FROM t ORDER BY i;
  i
-----
 100
 200
 400
(3 rows)

=> -- Generate an error by trying to copy into a table with a non-integer column
=> DROP TABLE t;
DROP TABLE
=> CREATE TABLE t (i INTEGER, j VARCHAR);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
vsql:UDParse.sql:94: ERROR 3399:  Failure in UDx RPC call
InvokeGetReturnTypeParser(): Error in User Defined Object [NumericTextParser],
error code: 0
com.vertica.sdk.UdfException: Column 2 of output table is not an Int
        at com.myCompany.UDParser.NumericTextParserFactory.getParserReturnType
        (NumericTextParserFactory.java:70)
        at com.vertica.udxfence.UDxExecContext.getReturnTypeParser(
        UDxExecContext.java:1464)
        at com.vertica.udxfence.UDxExecContext.getReturnTypeParser(
        UDxExecContext.java:768)
        at com.vertica.udxfence.UDxExecContext.run(UDxExecContext.java:236)
        at java.lang.Thread.run(Thread.java:662)

Parser implementation

The following code implements the parser.

package com.myCompany.UDParser;

import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;

import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.DestroyInvocation;
import com.vertica.sdk.RejectedRecord;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.InputState;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.StreamWriter;
import com.vertica.sdk.UDParser;
import com.vertica.sdk.UdfException;

public class NumericTextParser extends UDParser {

    private String separator; // Holds column separator character

    // List of strings that we accept as digits.
    private List<String> numbers = Arrays.asList("zero", "one",
            "two", "three", "four", "five", "six", "seven",
            "eight", "nine");

    // Hold information about the last rejected row.
    private String rejectedReason;
    private String rejectedRow;

    // Constructor gets the separator character from the Factory's prepare()
    // method.
    public NumericTextParser(String sepparam) {
        super();
        this.separator = sepparam;
    }

    // Called to perform the actual work of parsing. Gets a buffer of bytes
    // to turn into tuples.
    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer input,
            InputState input_state) throws UdfException, DestroyInvocation {

        int i=input.offset; // Current position in the input buffer
        // Flag to indicate whether we just found the end of a row.
        boolean lastCharNewline = false;
        // Buffer to hold the row of data being read.
        StringBuffer line = new StringBuffer();

        //Continue reading until end of buffer.
        for(; i < input.buf.length; i++){
            // Loop through input until we find a linebreak: marks end of row
            char inchar = (char) input.buf[i];
            // Note that this isn't a robust way to find rows. It should
            // accept a user-defined row separator. Also, the following
            // assumes ASCII line break metheods, which isn't a good idea
            // in the UTF world. But it is good enough for this simple example.
            if (inchar != '\n' && inchar != '\r') {
                // Keep adding to a line buffer until a full row of data is read
                line.append(inchar);
                lastCharNewline = false; // Last character not a new line
            } else {
                // Found a line break. Process the row.
                lastCharNewline = true; // indicate we got a complete row
                // Update the position in the input buffer. This is updated
                // whether the row is successfully processed or not.
                input.offset = i+1;
                // Call procesRow to extract values and write tuples to the
                // output. Returns false if there was an error.
                if (!processRow(line)) {
                    // Processing row failed. Save bad row to rejectedRow field
                    // and return to caller indicating a rejected row.
                    rejectedRow = line.toString();
                    // Update position where we processed the data.
                    return StreamState.REJECT;
                }
                line.delete(0, line.length()); // clear row buffer
            }
        }

        // At this point, process() has finished processing the input buffer.
        // There are two possibilities: need to get more data
        // from the input stream to finish processing, or there is
        // no more data to process. If at the end of the input stream and
        // the row was not terminated by a linefeed, it may need
        // to process the last row.

        if (input_state == InputState.END_OF_FILE && lastCharNewline) {
            // End of input and it ended on a newline. Nothing more to do
            return StreamState.DONE;
        } else if (input_state == InputState.END_OF_FILE && !lastCharNewline) {
            // At end of input stream but didn't get a final newline. Need to
            // process the final row that was read in, then exit for good.
            if (line.length() == 0) {
                // Nothing to process. Done parsing.
                return StreamState.DONE;
            }
            // Need to parse the last row, not terminated by a linefeed. This
            // can occur if the file being read didn't have a final line break.
            if (processRow(line)) {
                return StreamState.DONE;
            } else {
                // Processing last row failed. Save bad row to rejectedRow field
                // and return to caller indicating a rejected row.
                rejectedRow = line.toString();
                // Tell Vertica the entire buffer was processed so it won't
                // call again to have the line processed.
                input.offset = input.buf.length;
                return StreamState.REJECT;
            }
        } else {
            // Stream is not fully read, so tell Vertica to send more. If
            // process() didn't get a complete row before it hit the end of the
            // input buffer, it will end up re-processing that segment again
            // when more data is added to the buffer.
            return StreamState.INPUT_NEEDED;
        }
    }

    // Breaks a row into columns, then parses the content of the
    // columns. Returns false if there was an error parsing the
    // row, in which case it sets the rejected row to the input
    // line. Returns true if the row was successfully read.
    private boolean processRow(StringBuffer line)
                                throws UdfException, DestroyInvocation {
        String[] columns = line.toString().split(Pattern.quote(separator));
        // Loop through the columns, decoding their contents
        for (int col = 0; col < columns.length; col++) {
            // Call decodeColumn to extract value from this column
            Integer colval = decodeColumn(columns[col]);
            if (colval == null) {
                // Could not parse one of the columns. Indicate row should
                // be rejected.
                return false;
            }
            // Column parsed OK. Write it to the output. writer is a field
            // provided by the parent class. Since this parser only accepts
            // integers, there is no need to verify that data type of the parsed
            // data matches the data type of the column being written. In your
            // UDParsers, you may want to perform this verification.
            writer.setLong(col,colval);
        }
        // Done with the row of data. Advance output to next row.

        // Note that this example does not verify that all of the output columns
        // have values assigned to them. If there are missing values at the
        // end of a row, they get automatically get assigned a default value
        // (0 for integers). This isn't a robust solution. Your UDParser
        // should perform checks here to handle this situation and set values
        // (such as null) when appropriate.
        writer.next();
        return true; // Successfully processed the row.
    }

    // Gets a string with text numerals, i.e. "One Two Five Seven" and turns
    // it into an integer value, i.e. 1257. Returns null if the string could not
    // be parsed completely into numbers.
    private Integer decodeColumn(String text) {
        int value = 0; // Hold the value being parsed.

        // Split string into individual words. Eat extra spaces.
        String[] words = text.toLowerCase().trim().split("\\s+");

        // Loop through the words, matching them against the list of
        // digit strings.
        for (int i = 0; i < words.length; i++) {
            if (numbers.contains(words[i])) {
                // Matched a digit. Add the it to the value.
                int digit = numbers.indexOf(words[i]);
                value = (value * 10) + digit;
            } else {
                // The string didn't match one of the accepted string values
                // for digits. Need to reject the row. Set the rejected
                // reason string here so it can be incorporated into the
                // rejected reason object.
                //
                // Note that this example does not handle null column values.
                // In most cases, you want to differentiate between an
                // unparseable column value and a missing piece of input
                // data. This example just rejects the row if there is a missing
                // column value.
                rejectedReason = String.format(
                        "Could not parse '%s' into a digit",words[i]);
                return null;
            }
        }
        return value;
    }

    // Vertica calls this method if the parser rejected a row of data
    // to find out what went wrong and add to the proper logs. Just gathers
    // information stored in fields and returns it in an object.
    @Override
    public RejectedRecord getRejectedRecord() throws UdfException {
        return new RejectedRecord(rejectedReason,rejectedRow.toCharArray(),
                rejectedRow.length(), "\n");
    }
}

ParserFactory implementation

The following code implements the parser factory.

NumericTextParser accepts a single optional parameter named separator. This parameter is defined in the getParameterType() method, and the plan() method stores its value. NumericTextParser outputs only integer values. Therefore, if the output table contains a column whose data type is not integer, the getParserReturnType() method throws an exception.

package com.myCompany.UDParser;

import java.util.regex.Pattern;

import com.vertica.sdk.ParamReader;
import com.vertica.sdk.ParamWriter;
import com.vertica.sdk.ParserFactory;
import com.vertica.sdk.PerColumnParamReader;
import com.vertica.sdk.PlanContext;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.UDParser;
import com.vertica.sdk.UdfException;
import com.vertica.sdk.VerticaType;

public class NumericTextParserFactory extends ParserFactory {

    // Called once on the initiator host to check the parameters and set up the
    // context data that hosts performing processing will need later.
    @Override
    public void plan(ServerInterface srvInterface,
                PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt) {

        String separator = "|"; // assume separator is pipe character

        // See if a parameter was given for column separator
        ParamReader args = srvInterface.getParamReader();
        if (args.containsParameter("separator")) {
            separator = args.getString("separator");
            if (separator.length() > 1) {
                throw new UdfException(0,
                        "Separator parameter must be a single character");
            }
            if (Pattern.quote(separator).matches("[a-zA-Z]")) {
                throw new UdfException(0,
                        "Separator parameter cannot be a letter");
            }
        }

        // Save separator character in the Plan Data
        ParamWriter context = planCtxt.getWriter();
        context.setString("separator", separator);
    }

    // Define the data types of the output table that the parser will return.
    // Mainly, this just ensures that all of the columns in the table which
    // is the target of the data load are integer.
    @Override
    public void getParserReturnType(ServerInterface srvInterface,
                PerColumnParamReader perColumnParamReader,
                PlanContext planCtxt,
                SizedColumnTypes argTypes,
                SizedColumnTypes returnType) {

        // Get access to the output table's columns
        for (int i = 0; i < argTypes.getColumnCount(); i++ ) {
            if (argTypes.getColumnType(i).isInt()) {
                // Column is integer... add it to the output
                 returnType.addInt(argTypes.getColumnName(i));
            } else {
                // Column isn't an int, so throw an exception.
                // Technically, not necessary since the
                // UDx framework will automatically error out when it sees a
                // Discrepancy between the type in the target table and the
                // types declared by this method. Throwing this exception will
                // provide a clearer error message to the user.
                String message = String.format(
                    "Column %d of output table is not an Int", i + 1);
                throw new UdfException(0, message);
            }
        }
    }

    // Instantiate the UDParser subclass named NumericTextParser. Passes the
    // separator characetr as a paramter to the constructor.
    @Override
    public UDParser prepare(ServerInterface srvInterface,
            PerColumnParamReader perColumnParamReader, PlanContext planCtxt,
            SizedColumnTypes returnType) throws UdfException {
        // Get the separator character from the context
        String separator = planCtxt.getReader().getString("separator");
        return new NumericTextParser(separator);
    }

    // Describe the parameters accepted by this parser.
    @Override
    public void getParameterType(ServerInterface srvInterface,
             SizedColumnTypes parameterTypes) {
        parameterTypes.addVarchar(1, "separator");
    }
}

7 - Java example: JSON parser

The JSON Parser consumes a stream of JSON objects.

The JSON Parser consumes a stream of JSON objects. Each object must be well formed and on a single line in the input. Use line breaks to delimit the objects. The parser uses the field names as keys in a map, which become column names in the table. You can find the code for this example in /opt/vertica/packages/flextable/examples. This directory also contains an example data file.

This example uses the setRowFromMap() method to write data.

Loading and using the example

Load the library and define the JSON parser, using the third-party library (gson-2.2.4.jar) as follows. See the comments in JsonParser.java for a download URL:

=> CREATE LIBRARY json
-> AS '/opt/vertica/packages/flextable/examples/java/output/json.jar'
-> DEPENDS '/opt/vertica/bin/gson-2.2.4.jar' language 'java';
CREATE LIBRARY

=> CREATE PARSER JsonParser AS LANGUAGE 'java'
-> NAME 'com.vertica.flex.JsonParserFactory' LIBRARY json;
CREATE PARSER FUNCTION

You can now define a table and then use the JSON parser to load data into it, as follows:

=> CREATE TABLE mountains(name varchar(64), type varchar(32), height integer);
CREATE TABLE

=> COPY mountains FROM '/opt/vertica/packages/flextable/examples/mountains.json'
-> WITH PARSER JsonParser();
-[ RECORD 1 ]--
Rows Loaded | 2

=> SELECT * from mountains;
-[ RECORD 1 ]--------
name   | Everest
type   | mountain
height | 29029
-[ RECORD 2 ]--------
name   | Mt St Helens
type   | volcano
height |

The data file contains a value (hike_safety) that was not loaded because the table definition did not include that column. The data file follows:

{ "name": "Everest", "type":"mountain", "height": 29029, "hike_safety": 34.1  }
{ "name": "Mt St Helens", "type": "volcano", "hike_safety": 15.4 }

Implementation

The following code shows the process() method from JsonParser.java. The parser attempts to read the input into a Map. If the read is successful, the JSON Parser calls setRowFromMap():

    @Override
    public StreamState process(ServerInterface srvInterface, DataBuffer input,
            InputState inputState) throws UdfException, DestroyInvocation {
        clearReject();
        StreamWriter output = getStreamWriter();

        while (input.offset < input.buf.length) {
            ByteBuffer lineBytes = consumeNextLine(input, inputState);

            if (lineBytes == null) {
                return StreamState.INPUT_NEEDED;
            }

            String lineString = StringUtils.newString(lineBytes);

            try {
                Map<String,Object> map = gson.fromJson(lineString, parseType);

                if (map == null) {
                    continue;
                }

                output.setRowFromMap(map);
                // No overrides needed, so just call next() here.
         output.next();
            } catch (Exception ex) {
                setReject(lineString, ex);
                return StreamState.REJECT;
            }
        }
    }

The factory, JsonParserFactory.java, instantiates and returns a parser in the prepare() method. No additional setup is required.

8 - C++ example: delimited parser and chunker

The ExampleDelimitedUDChunker class divides an input at delimiter characters.

The ExampleDelimitedUDChunker class divides an input at delimiter characters. You can use this chunker with any parser that understands delimited input. ExampleDelimitedParser is a ContinuousUDParser subclass that uses this chunker.

Loading and using the example

Load and use the example as follows.

=> CREATE LIBRARY ExampleDelimitedParserLib AS '/home/dbadmin/EDP.so';

=> CREATE PARSER ExampleDelimitedParser AS
    LANGUAGE 'C++' NAME 'DelimitedParserFrameworkExampleFactory'
    LIBRARY ExampleDelimitedParserLib;

=> COPY t FROM stdin WITH PARSER ExampleDelimitedParser();
0
1
2
3
4
5
6
7
8
9
\.

Chunker implementation

This chunker supports apportioned load. The alignPortion() method finds the beginning of the first complete record in the current portion and aligns the input buffer with it. The record terminator is passed as an argument and set in the constructor.

StreamState ExampleDelimitedUDChunker::alignPortion(
            ServerInterface &srvInterface,
            DataBuffer &input, InputState state)
{
    /* find the first record terminator.  Its record belongs to the previous portion */
    void *buf = reinterpret_cast<void *>(input.buf + input.offset);
    void *term = memchr(buf, recordTerminator, input.size - input.offset);

    if (term) {
        /* record boundary found.  Align to the start of the next record */
        const size_t chunkSize = reinterpret_cast<size_t>(term) - reinterpret_cast<size_t>(buf);
        input.offset += chunkSize
            + sizeof(char) /* length of record terminator */;

        /* input.offset points at the start of the first complete record in the portion */
        return DONE;
    } else if (state == END_OF_FILE || state == END_OF_PORTION) {
        return REJECT;
    } else {
        VIAssert(state == START_OF_PORTION || state == OK);
        return INPUT_NEEDED;
    }
}

The process() method has to account for chunks that span portion boundaries. If the previous call was at the end of a portion, the method set a flag. The code begins by checking for and handling that condition. The logic is similar to that of alignPortion(), so the example calls it to do part of the division.

StreamState ExampleDelimitedUDChunker::process(
                ServerInterface &srvInterface,
                DataBuffer &input,
                InputState input_state)
{
    const size_t termLen = 1;
    const char *terminator = &recordTerminator;

    if (pastPortion) {
        /*
         * Previous state was END_OF_PORTION, and the last chunk we will produce
         * extends beyond the portion we started with, into the next portion.
         * To be consistent with alignPortion(), that means finding the first
         * record boundary, and setting the chunk to be at that boundary.
         * Fortunately, this logic is identical to aligning the portion (with
         * some slight accounting for END_OF_FILE)!
         */
        const StreamState findLastTerminator = alignPortion(srvInterface, input);

        switch (findLastTerminator) {
            case DONE:
                return DONE;
            case INPUT_NEEDED:
                if (input_state == END_OF_FILE) {
                    /* there is no more input where we might find a record terminator */
                    input.offset = input.size;
                    return DONE;
                }
                return INPUT_NEEDED;
            default:
                VIAssert("Invalid return state from alignPortion()");
        }
        return findLastTerminator;
    }

Now the method looks for the delimiter. If the input began at the end of a portion, it sets the flag.

    size_t ret = input.offset, term_index = 0;
    for (size_t index = input.offset; index < input.size; ++index) {
        const char c = input.buf[index];
        if (c == terminator[term_index]) {
            ++term_index;
            if (term_index == termLen) {
                ret = index + 1;
                term_index = 0;
            }
            continue;
        } else if (term_index > 0) {
            index -= term_index;
        }

        term_index = 0;
    }

    if (input_state == END_OF_PORTION) {
        /*
         * Regardless of whether or not a record was found, the next chunk will extend
         * into the next portion.
         */
        pastPortion = true;
    }

Finally, process() moves the input offset and returns.

    // if we were able to find some rows, move the offset to point at the start of the next (potential) row, or end of block
    if (ret > input.offset) {
        input.offset = ret;
        return CHUNK_ALIGNED;
    }

    if (input_state == END_OF_FILE) {
        input.offset = input.size;
        return DONE;
    }

    return INPUT_NEEDED;
}

Factory implementation

The file ExampleDelimitedParser.cpp defines a factory that uses this UDChunker. The chunker supports apportioned load, so the factory implements isChunkerApportionable():

    virtual bool isChunkerApportionable(ServerInterface &srvInterface) {
        ParamReader params = srvInterface.getParamReader();
        if (params.containsParameter("disable_chunker") && params.getBoolRef("d\
isable_chunker")) {
            return false;
        } else {
            return true;
        }
    }

The prepareChunker() method creates the chunker:

    virtual UDChunker* prepareChunker(ServerInterface &srvInterface,
                                      PerColumnParamReader &perColumnParamReade\
r,
                                      PlanContext &planCtxt,
                                      const SizedColumnTypes &returnType)
    {
        ParamReader params = srvInterface.getParamReader();
        if (params.containsParameter("disable_chunker") && params.getBoolRef("d\
isable_chunker")) {
            return NULL;
        }

        std::string recordTerminator("\n");

        ParamReader args(srvInterface.getParamReader());
        if (args.containsParameter("record_terminator")) {
            recordTerminator = args.getStringRef("record_terminator").str();
        }

        return vt_createFuncObject<ExampleDelimitedUDChunker>(srvInterface.allo\
cator,
                recordTerminator[0]);
    }

9 - Python example: complex types JSON parser

The following example details a UDParser that takes a JSON object and parses it into complex types.

The following example details a UDParser that takes a JSON object and parses it into complex types. For this example, the parser assumes the input data are arrays of rows with two integer fields. The input records should be separated by newline characters. If any row fields aren't specified by the JSON input, the function parses those fields as NULL.

The source code for this UDParser also contains a factory method for parsing rows that have an integer and an array of integer fields. The implementation of the parser is independent of the return type in the factory, so you can create factories with different return types that all point to the ComplexJsonParser() class in the prepare() method. The complete source code is in /opt/vertica/sdk/examples/python/UDParsers.py.

Loading and using the example

Load the library and create the parser as follows:


=> CREATE OR REPLACE LIBRARY UDParsers AS '/home/dbadmin/examples/python/UDParsers.py' LANGUAGE 'Python';

=> CREATE PARSER ComplexJsonParser AS LANGUAGE 'Python' NAME 'ArrayJsonParserFactory' LIBRARY UDParsers;

You can now define a table and then use the JSON parser to load data into it, for example:


=> CREATE TABLE orders (a bool, arr array[row(a int, b int)]);
CREATE TABLE

=> COPY orders (arr) FROM STDIN WITH PARSER ComplexJsonParser();
[]
[{"a":1, "b":10}]
[{"a":1, "b":10}, {"a":null, "b":10}]
[{"a":1, "b":10},{"a":10, "b":20}]
[{"a":1, "b":10}, {"a":null, "b":null}]
[{"a":1, "b":2}, {"a":3, "b":4}, {"a":5, "b":6}, {"a":7, "b":8}, {"a":9, "b":10}, {"a":11, "b":12}, {"a":13, "b":14}]
\.

=> SELECT * FROM orders;
a |                                  arr
--+--------------------------------------------------------------------------
  | []
  | [{"a":1,"b":10}]
  | [{"a":1,"b":10},{"a":null,"b":10}]
  | [{"a":1,"b":10},{"a":10,"b":20}]
  | [{"a":1,"b":10},{"a":null,"b":null}]
  | [{"a":1,"b":2},{"a":3,"b":4},{"a":5,"b":6},{"a":7,"b":8},{"a":9,"b":10},{"a":11,"b":12},{"a":13,"b":14}]
(6 rows)

Setup

All Python UDxs must import the Vertica SDK library. ComplexJsonParser() also requires the json library.

import vertica_sdk
import json

Factory implementation

The prepare() method instantiates and returns a parser:


def prepare(self, srvInterface, perColumnParamReader, planCtxt, returnType):
    return ComplexJsonParser()

getParserReturnType() declares that the return type must be an array of rows that each have two integer fields:


def getParserReturnType(self, rvInterface, perColumnParamReader, planCtxt, argTypes, returnType):
    fieldTypes = vertica_sdk.SizedColumnTypes.makeEmpty()
    fieldTypes.addInt('a')
    fieldTypes.addInt('b')
    returnType.addArrayType(vertica_sdk.SizedColumnTypes.makeRowType(fieldTypes, 'elements'), 64, 'arr')

Parser implementation

The process() method reads in data with an InputBuffer and then splits that input data on the newline character. The method then passes the processed data to the writeRows() method. writeRows() turns each data row into a JSON object, checks the type of that JSON object, and then writes the appropriate value or object to the output.


class ComplexJsonParser(vertica_sdk.UDParser):

    leftover = ''

    def process(self, srvInterface, input_buffer, input_state, writer):
        input_buffer.setEncoding('utf-8')

        self.count = 0
        rec = self.leftover + input_buffer.read()
        row_lst = rec.split('\n')
        self.leftover = row_lst[-1]
        self.writeRows(row_lst[:-1], writer)
        if input_state == InputState.END_OF_FILE:
            self.writeRows([self.leftover], writer)
            return StreamState.DONE
        else:
            return StreamState.INPUT_NEEDED

    def writeRows(self, str_lst, writer):
        for s in str_lst:
            stripped = s.strip()
            if len(stripped) == 0:
                return
            elif len(stripped) > 1 and stripped[0:2] == "//":
                continue
            jsonValue = json.loads(stripped)
            if type(jsonValue) is list:
                writer.setArray(0, jsonValue)
            elif jsonValue is None:
                writer.setNull(0)
            else:
                writer.setRow(0, jsonValue)
            writer.next()