Using VerticaCopyStream

The VerticaCopyStream class lets you stream data from the client system to a Vertica database.

The VerticaCopyStream class lets you stream data from the client system to a Vertica database. It lets you use COPY directly without first copying the data to a host in the database cluster. Using COPY to load data from the host requires superuser privileges to access the host's file system. The COPY statement used to load data from a stream does not require superuser privileges, so your client can connect with any user account that has INSERT privileges on the target table.

To copy streams into the database:

  1. Disable the database connections AutoCommit connection parameter.

  2. Instantiate a VerticaCopyStreamObject, passing it at least the database connection objects and a string containing a COPY statement to load the data. This statement must copy data from the STDIN into your table. You can use any parameters that are appropriate for your data load.

  3. Call VerticaCopyStreamObject.start() to start the COPY statement and begin streaming the data in any streams you have already added to the VerticaCopyStreamObject.

  4. Call VerticaCopyStreamObject.addStream() to add additional streams to the list of streams to send to the database. You can then call VerticaCopyStreamObject.execute() to stream them to the server.

  5. Optionally, call VerticaCopyStreamObject.getRejects() to get a list of rejected rows from the last .execute() call. The list of rejects is reset by each call to .execute() or .finish().

  6. When you are finished adding streams, call VerticaCopyStreamObject.finish() to send any remaining streams to the database and close the COPY statement.

  7. Call Connection.commit() to commit the loaded data.

Getting rejected rows

The VerticaCopyStreamObject.getRejects() method returns a List containing the row numbers of rows that were rejected after the previous .execute() method call. Each call to .execute() clears the list of rejected rows, so you need to call .getRejects() after each call to .execute(). Since .start() and .finish() also call .execute() to send any pending streams to the server, you should also call .getRejects() after these methods as well.

The following example demonstrates loading the content of five text files stored on the client system into a table.

import java.io.File;
import java.io.FileInputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import com.vertica.jdbc.VerticaConnection;
import com.vertica.jdbc.VerticaCopyStream;

public class CopyMultipleStreamsExample {
    public static void main(String[] args) {
        // Note: If running on Java 5, you need to call Class.forName
        // to manually load the JDBC driver.
        // Set up the properties of the connection
        Properties myProp = new Properties();
        myProp.put("user", "ExampleUser"); // Must be superuser
        myProp.put("password", "password123");
        // When performing bulk loads, you should always disable the
        // connection's AutoCommit property to ensure the loads happen as
        // efficiently as possible by reusing the same COPY command and
        // transaction.
        myProp.put("AutoCommit", "false");
        Connection conn;
        try {
            conn = DriverManager.getConnection(
                          "jdbc:vertica://VerticaHost:5433/ExampleDB", myProp);
            Statement stmt = conn.createStatement();

            // Create a table to receive the data
            stmt.execute("DROP TABLE IF EXISTS customers");
            stmt.execute("CREATE TABLE customers (Last_Name char(50), "
                            + "First_Name char(50),Email char(50), "
                            + "Phone_Number char(15))");

            // Prepare the query to insert from a stream. This query must use
            // the COPY statement to load data from STDIN. Unlike copying from
            // a file on the host, you do not need superuser privileges to
            // copy a stream. All your user account needs is INSERT privileges
            // on the target table.
            String copyQuery = "COPY customers FROM STDIN "
                            + "DELIMITER '|' ENFORCELENGTH";

            // Create an instance of the stream class. Pass in the
            // connection and the query string.
            VerticaCopyStream stream = new VerticaCopyStream(
                            (VerticaConnection) conn, copyQuery);

            // Keep running count of the number of rejects
            int totalRejects = 0;

            // start() starts the stream process, and opens the COPY command.
            stream.start();

            // If you added streams to VerticaCopyStream before calling start(),
            // You should check for rejects here (see below). The start() method
            // calls execute() to send any pre-queued streams to the server
            // once the COPY statement has been created.

            // Simple for loop to load 5 text files named customers-1.txt to
            // customers-5.txt
            for (int loadNum = 1; loadNum <= 5; loadNum++) {
                // Prepare the input file stream. Read from a local file.
                String filename = "C:\\Data\\customers-" + loadNum + ".txt";
                System.out.println("\n\nLoading file: " + filename);
                File inputFile = new File(filename);
                FileInputStream inputStream = new FileInputStream(inputFile);

                // Add stream to the VerticaCopyStream
                stream.addStream(inputStream);

                // call execute() to load the newly added stream. You could
                // add many streams and call execute once to load them all.
                // Which method you choose depends mainly on whether you want
                // the ability to check the number of rejections as the load
                // progresses so you can stop if the number of rejects gets too
                // high. Also, high numbers of InputStreams could create a
                // resource issue on your client system.
                stream.execute();

                // Show any rejects from this execution of the stream load
                // getRejects() returns a List containing the
                // row numbers of rejected rows.
                List<Long> rejects = stream.getRejects();

                // The size of the list gives you the number of rejected rows.
                int numRejects = rejects.size();
                totalRejects += numRejects;
                System.out.println("Number of rows rejected in load #"
                                + loadNum + ": " + numRejects);

                // List all of the rows that were rejected.
                Iterator<Long> rejit = rejects.iterator();
                long linecount = 0;
                while (rejit.hasNext()) {
                    System.out.print("Rejected row #" + ++linecount);
                    System.out.println(" is row " + rejit.next());
                }
            }
            // Finish closes the COPY command. It returns the number of
            // rows inserted.
            long results = stream.finish();
            System.out.println("Finish returned " + results);

            // If you added any streams that hadn't been executed(),
            // you should also check for rejects here, since finish()
            // calls execute() to

            // You can also get the number of rows inserted using
            // getRowCount().
            System.out.println("Number of rows accepted: "
                            + stream.getRowCount());
            System.out.println("Total number of rows rejected: " + totalRejects);

            // Commit the loaded data
            conn.commit();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Running the above example on some sample data results in the following output:


Loading file: C:\Data\customers-1.txtNumber of rows rejected in load #1: 3
Rejected row #1 is row 3
Rejected row #2 is row 7
Rejected row #3 is row 51
Loading file: C:\Data\customers-2.txt
Number of rows rejected in load #2: 5Rejected row #1 is row 4143
Rejected row #2 is row 6132
Rejected row #3 is row 9998
Rejected row #4 is row 10000
Rejected row #5 is row 10050
Loading file: C:\Data\customers-3.txt
Number of rows rejected in load #3: 9
Rejected row #1 is row 14142
Rejected row #2 is row 16131
Rejected row #3 is row 19999
Rejected row #4 is row 20001
Rejected row #5 is row 20005
Rejected row #6 is row 20049
Rejected row #7 is row 20056
Rejected row #8 is row 20144
Rejected row #9 is row 20236
Loading file: C:\Data\customers-4.txt
Number of rows rejected in load #4: 8
Rejected row #1 is row 23774
Rejected row #2 is row 24141
Rejected row #3 is row 25906
Rejected row #4 is row 26130
Rejected row #5 is row 27317
Rejected row #6 is row 28121
Rejected row #7 is row 29321
Rejected row #8 is row 29998
Loading file: C:\Data\customers-5.txt
Number of rows rejected in load #5: 1
Rejected row #1 is row 39997
Finish returned 39995
Number of rows accepted: 39995
Total number of rows rejected: 26