这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

通过 ADO.NET 进行流式数据传输

以下两个选项可用于通过 ADO.NET 将客户端上的文件中的数据以流式传输到 Vertica 数据库:

  • 使用 VerticaCopyStream ADO.NET 类以面向对象的方式进行流式数据传输

  • 执行 COPY LOCAL SQL 语句以进行流式数据传输

此部分中的主题介绍了使用这些选项的方法。

1 - 通过 VerticaCopyStream 从客户端进行流式传输

使用 VerticaCopyStream 类可以将数据从客户端系统流式传输到 Vertica 数据库。通过此类,您可以直接使用 SQL COPY 语句,而不必通过将 STDIN 替换为一个或多个数据流先将数据复制到数据库群集中的主机。

注意:

  • 使用事务并对 copy 命令禁用自动提交可提高性能。

  • 可以通过将 copy 命令与“no commit”修饰符结合使用来禁用自动提交。您必须显式禁用提交。使用 VerticaCopyStream 时,启用事务不会禁用自动提交。

  • 与 VerticaCopyStream 结合使用的 copy 命令使用 copy 语法。

  • 每次调用 execute 时,都会将 VerticaCopyStream.rejects 置零。如果要捕获拒绝数,请将 VerticaCopyStream.rejects 的值分配给另一个变量,然后再次调用 execute。

  • 可以使用 AddStream() 调用添加多个流。

示例用法:

以下示例演示了使用 VerticaCopyStream 将文件流复制到 Vertica。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.IO;
using Vertica.Data.VerticaClient;
namespace ConsoleApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            // Configure connection properties
                    VerticaConnectionStringBuilder builder = new VerticaConnectionStringBuilder();
                builder.Host = "192.168.1.10";
            builder.Database = "VMart";
            builder.User = "dbadmin";
            //open the connection
            VerticaConnection _conn = new VerticaConnection(builder.ToString());
            _conn.Open();
            try
            {
                using (_conn)
                {
                    // Start a transaction
                            VerticaTransaction txn = _conn.BeginTransaction();

                            // Create a table for this example
                            VerticaCommand command = new VerticaCommand("DROP TABLE IF EXISTS copy_table", _conn);
                command.ExecuteNonQuery();
                    command.CommandText = "CREATE TABLE copy_table (Last_Name char(50), "
                                    + "First_Name char(50),Email char(50), "
                                    + "Phone_Number char(15))";
                    command.ExecuteNonQuery();
                    // Create a new filestream from the data file
                            string filename = "C:/customers.txt";
                 Console.WriteLine("\n\nLoading File: " + filename);
                    FileStream inputfile = File.OpenRead(filename);
                    // Define the copy command
                            string copy = "copy copy_table from stdin record terminator E'\n' delimiter '|'" + " enforcelength "
                        + " no commit";
                    // Create a new copy stream instance with the connection and copy statement
                            VerticaCopyStream vcs = new VerticaCopyStream(_conn, copy);

                            // Start the VerticaCopyStream process
                            vcs.Start();
                            // Add the file stream
                            vcs.AddStream(inputfile, false);

                            // Execute the copy
                            vcs.Execute();

                            // Finish stream and write out the list of inserted and rejected rows
                            long rowsInserted = vcs.Finish();
                IList<long> rowsRejected = vcs.Rejects;
                // Does not work when rejected or exceptions defined
                    Console.WriteLine("Number of Rows inserted: " + rowsInserted);
                    Console.WriteLine("Number of Rows rejected: " + rowsRejected.Count);
                    if (rowsRejected.Count > 0)
                    {
                        for (int i = 0; i < rowsRejected.Count; i++)
                        {
                            Console.WriteLine("Rejected row #{0} is row {1}", i, rowsRejected[i]);
                        }
                    }

                            // Commit the changes
                            txn.Commit();
            }
            }
            catch (Exception e)
            {
                Console.WriteLine(e.Message);
            }


                    //close the connection
                    _conn.Close();
    }
    }
}

2 - 将复制功能与 ADO.NET 配合使用

若要将 COPY 与 ADO.NET 结合使用,只需执行 COPY 语句并指定指向客户端系统上的源文件的路径即可。此方法比使用 VerticaCopyStream 类更简单。但是,如果有许多文件要复制到数据库,或者如果数据来自本地文件以外的其他源(例如,通过网络连接进行流式传输),您可能倾向于使用 VerticaCopyStream。

以下示例代码演示了使用 COPY 将文件从客户端复制到数据库。此示例与“使用 COPY 语句进行批量加载”中所示的代码相同,并且数据文件的路径位于客户端系统而非服务器上。

若要加载存储在数据库节点上的数据,请使用 VerticaCommand 对象创建 COPY 命令:

  1. 通过数据文件存储在的节点创建与数据库的连接

  2. 使用连接创建命令对象。

    VerticaCommand command = _conn.CreateCommand();
    
  3. 复制数据。以下是使用 COPY 命令加载数据的示例。此示例通过使用 LOCAL 修饰符发出命令来复制位于客户端本地的文件。

    command.CommandText = "copy lcopy_table from '/home/dbadmin/customers.txt'"
      + " record terminator E'\n' delimiter '|'"
      + " enforcelength ";
    
    Int32 insertedRows = command.ExecuteNonQuery();
    Console.WriteLine(insertedRows + " inserted.");
    

示例用法:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.IO;
using Vertica.Data.VerticaClient;
namespace ConsoleApplication
{
    class Program
    {
        static void Main(string[] args)
        {
            // Configure connection properties
            VerticaConnectionStringBuilder builder = new VerticaConnectionStringBuilder();
     builder.Host = "192.168.1.10";
        builder.Database = "VMart";
        builder.User = "dbadmin";

                   // Open the connection
                    VerticaConnection _conn = new VerticaConnection(builder.ToString());
            _conn.Open();
            try
            {
                using (_conn)
                {

                            // Start a transaction
                            VerticaTransaction txn = _conn.BeginTransaction();

                            // Create a table for this example
                            VerticaCommand command = new VerticaCommand("DROP TABLE IF EXISTS lcopy_table", _conn);
                    command.ExecuteNonQuery();
                    command.CommandText = "CREATE TABLE IF NOT EXISTS lcopy_table (Last_Name char(50), "
                                    + "First_Name char(50),Email char(50), "
                                    + "Phone_Number char(15))";
                    command.ExecuteNonQuery();
                    // Define the copy command
                            command.CommandText = "copy lcopy_table from '/home/dbadmin/customers.txt'"
            + " record terminator E'\n' delimiter '|'"
                        + " enforcelength "
                + " no commit";
                            // Execute the copy
        Int32 insertedRows = command.ExecuteNonQuery();
Console.WriteLine(insertedRows + " inserted.");
                            // Commit the changes
                            txn.Commit();
                }
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: " + e.Message);
            }


                    // Close the connection
                    _conn.Close();
        }
    }
}