Streaming from the client via VerticaCopyStream
The VerticaCopyStream
class lets you stream data from the client system to a Vertica database. It lets you use the SQL COPY statement directly without having to copy the data to a host in the database cluster first by substituting one or more data stream(s) for STDIN.
Notes:
-
Use Transactions and disable auto commit on the copy command for better performance.
-
Disable auto commit using the copy command with the 'no commit' modifier. You must explicitly disable commits. Enabling transactions does not disable autocommit when using VerticaCopyStream.
-
The copy command used with VerticaCopyStream uses copy syntax.
-
VerticaCopyStream.rejects is zeroed every time execute is called. If you want to capture the number of rejects, assign the value of VerticaCopyStream.rejects to another variable before calling execute again.
-
You can add multiple streams using multiple AddStream() calls.
Example usage:
The following example demonstrates using VerticaCopyStream to copy a file stream into Vertica.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.IO;
using Vertica.Data.VerticaClient;
namespace ConsoleApplication
{
class Program
{
static void Main(string[] args)
{
// Configure connection properties
VerticaConnectionStringBuilder builder = new VerticaConnectionStringBuilder();
builder.Host = "192.168.1.10";
builder.Database = "VMart";
builder.User = "dbadmin";
//open the connection
VerticaConnection _conn = new VerticaConnection(builder.ToString());
_conn.Open();
try
{
using (_conn)
{
// Start a transaction
VerticaTransaction txn = _conn.BeginTransaction();
// Create a table for this example
VerticaCommand command = new VerticaCommand("DROP TABLE IF EXISTS copy_table", _conn);
command.ExecuteNonQuery();
command.CommandText = "CREATE TABLE copy_table (Last_Name char(50), "
+ "First_Name char(50),Email char(50), "
+ "Phone_Number char(15))";
command.ExecuteNonQuery();
// Create a new filestream from the data file
string filename = "C:/customers.txt";
Console.WriteLine("\n\nLoading File: " + filename);
FileStream inputfile = File.OpenRead(filename);
// Define the copy command
string copy = "copy copy_table from stdin record terminator E'\n' delimiter '|'" + " enforcelength "
+ " no commit";
// Create a new copy stream instance with the connection and copy statement
VerticaCopyStream vcs = new VerticaCopyStream(_conn, copy);
// Start the VerticaCopyStream process
vcs.Start();
// Add the file stream
vcs.AddStream(inputfile, false);
// Execute the copy
vcs.Execute();
// Finish stream and write out the list of inserted and rejected rows
long rowsInserted = vcs.Finish();
IList<long> rowsRejected = vcs.Rejects;
// Does not work when rejected or exceptions defined
Console.WriteLine("Number of Rows inserted: " + rowsInserted);
Console.WriteLine("Number of Rows rejected: " + rowsRejected.Count);
if (rowsRejected.Count > 0)
{
for (int i = 0; i < rowsRejected.Count; i++)
{
Console.WriteLine("Rejected row #{0} is row {1}", i, rowsRejected[i]);
}
}
// Commit the changes
txn.Commit();
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
//close the connection
_conn.Close();
}
}
}