Streaming from the client via VerticaCopyStream

The VerticaCopyStream class lets you stream data from the client system to a Vertica database.

The VerticaCopyStream class lets you stream data from the client system to a Vertica database. It lets you use the SQL COPY statement directly without having to copy the data to a host in the database cluster first by substituting one or more data stream(s) for STDIN.

Notes:

  • Use Transactions and disable auto commit on the copy command for better performance.

  • Disable auto commit using the copy command with the 'no commit' modifier. You must explicitly disable commits. Enabling transactions does not disable autocommit when using VerticaCopyStream.

  • The copy command used with VerticaCopyStream uses copy syntax.

  • VerticaCopyStream.rejects is zeroed every time execute is called. If you want to capture the number of rejects, assign the value of VerticaCopyStream.rejects to another variable before calling execute again.

  • You can add multiple streams using multiple AddStream() calls.

Example usage:

The following example demonstrates using VerticaCopyStream to copy a file stream into Vertica.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.IO;
using Vertica.Data.VerticaClient;
namespace ConsoleApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            // Configure connection properties
                    VerticaConnectionStringBuilder builder = new VerticaConnectionStringBuilder();
                builder.Host = "192.168.1.10";
            builder.Database = "VMart";
            builder.User = "dbadmin";
            //open the connection
            VerticaConnection _conn = new VerticaConnection(builder.ToString());
            _conn.Open();
            try
            {
                using (_conn)
                {
                    // Start a transaction
                            VerticaTransaction txn = _conn.BeginTransaction();

                            // Create a table for this example
                            VerticaCommand command = new VerticaCommand("DROP TABLE IF EXISTS copy_table", _conn);
                command.ExecuteNonQuery();
                    command.CommandText = "CREATE TABLE copy_table (Last_Name char(50), "
                                    + "First_Name char(50),Email char(50), "
                                    + "Phone_Number char(15))";
                    command.ExecuteNonQuery();
                    // Create a new filestream from the data file
                            string filename = "C:/customers.txt";
                 Console.WriteLine("\n\nLoading File: " + filename);
                    FileStream inputfile = File.OpenRead(filename);
                    // Define the copy command
                            string copy = "copy copy_table from stdin record terminator E'\n' delimiter '|'" + " enforcelength "
                        + " no commit";
                    // Create a new copy stream instance with the connection and copy statement
                            VerticaCopyStream vcs = new VerticaCopyStream(_conn, copy);

                            // Start the VerticaCopyStream process
                            vcs.Start();
                            // Add the file stream
                            vcs.AddStream(inputfile, false);

                            // Execute the copy
                            vcs.Execute();

                            // Finish stream and write out the list of inserted and rejected rows
                            long rowsInserted = vcs.Finish();
                IList<long> rowsRejected = vcs.Rejects;
                // Does not work when rejected or exceptions defined
                    Console.WriteLine("Number of Rows inserted: " + rowsInserted);
                    Console.WriteLine("Number of Rows rejected: " + rowsRejected.Count);
                    if (rowsRejected.Count > 0)
                    {
                        for (int i = 0; i < rowsRejected.Count; i++)
                        {
                            Console.WriteLine("Rejected row #{0} is row {1}", i, rowsRejected[i]);
                        }
                    }

                            // Commit the changes
                            txn.Commit();
            }
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }


                    //close the connection
                    _conn.Close();
    }
    }
}