此部分详细介绍可用于通过 ADO.NET 客户端驱动程序加载 Vertica 中的数据的各种方法:
1 - 使用 Vertica 数据适配器
Vertica 数据适配器 (VerticaDataAdapter) 可使客户端能够在数据集和 Vertica 数据库之间交换数据。该适配器是 DbDataAdapter 的实施。例如,您可以使用 VerticaDataAdapter 仅读取数据;或者也可以从数据库将数据读取到数据集中,然后将数据集中已更改的数据写回数据库。
批量更新
使用 Update() 方法更新数据集时,您可以选择在调用 Update() 之前使用 UpdateBatchSize() 方法,以便减少客户端在执行更新期间与服务器通信的次数。UpdateBatchSize 的默认值为 1。如果对数据集使用多个 rows.Add() 命令,则您可以将批大小更改为最佳大小以加快客户端完成更新所需执行的操作。
使用数据适配器从 Vertica 读取数据:
以下示例详细介绍如何对 VMart 架构执行 select 查询并将结果加载到 DataTable,然后将 DataTable 的内容输出到控制台。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.Data.SqlClient;
Vertica.Data.VerticaClient;
namespace ConsoleApplication
{
class Program
{
static void Main(string[] args)
{
VerticaConnectionStringBuilder builder = new VerticaConnectionStringBuilder();
builder.Host = "192.168.1.10";
builder.Database = "VMart";
builder.User = "dbadmin";
VerticaConnection _conn = new VerticaConnection(builder.ToString());
_conn.Open();
// Try/Catch any exceptions
try
{
using (_conn)
{
// Create the command
VerticaCommand command = _conn.CreateCommand();
command.CommandText = "select product_key, product_description " +
"from product_dimension where product_key < 10";
// Associate the command with the connection
command.Connection = _conn;
// Create the DataAdapter
VerticaDataAdapter adapter = new VerticaDataAdapter();
adapter.SelectCommand = command;
// Fill the DataTable
DataTable table = new DataTable();
adapter.Fill(table);
// Display each row and column value.
int i = 1;
foreach (DataRow row in table.Rows)
{
foreach (DataColumn column in table.Columns)
{
Console.Write(row[column] + "\t");
}
Console.WriteLine();
i++;
}
Console.WriteLine(i + " rows returned.");
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
_conn.Close();
}
}
}
从 Vertica 将数据读取到数据集并更改数据:
以下示例显示了如何使用数据适配器从 VMart 架构的维度表读取数据以及将数据插入到其中。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.Data.SqlClient;
using Vertica.Data.VerticaClient
namespace ConsoleApplication
{
class Program
{
static void Main(string[] args)
{
VerticaConnectionStringBuilder builder = new VerticaConnectionStringBuilder();
builder.Host = "192.168.1.10";
builder.Database = "VMart";
builder.User = "dbadmin";
VerticaConnection _conn = new VerticaConnection(builder.ToString());
_conn.Open();
// Try/Catch any exceptions
try
{
using (_conn)
{
//Create a data adapter object using the connection
VerticaDataAdapter da = new VerticaDataAdapter();
//Create a select statement that retrieves data from the table
da.SelectCommand = new
VerticaCommand("select * from product_dimension where product_key < 10",
_conn);
//Set up the insert command for the data adapter, and bind variables for some of the columns
da.InsertCommand = new
VerticaCommand("insert into product_dimension values( :key, :version, :desc )",
_conn);
da.InsertCommand.Parameters.Add(new VerticaParameter("key", VerticaType.BigInt));
da.InsertCommand.Parameters.Add(new VerticaParameter("version", VerticaType.BigInt));
da.InsertCommand.Parameters.Add(new VerticaParameter("desc", VerticaType.VarChar));
da.InsertCommand.Parameters[0].SourceColumn = "product_key";
da.InsertCommand.Parameters[1].SourceColumn = "product_version";
da.InsertCommand.Parameters[2].SourceColumn = "product_description";
da.TableMappings.Add("product_key", "product_key");
da.TableMappings.Add("product_version", "product_version");
da.TableMappings.Add("product_description", "product_description");
//Create and fill a Data set for this dimension table, and get the resulting DataTable.
DataSet ds = new DataSet();
da.Fill(ds, 0, 0, "product_dimension");
DataTable dt = ds.Tables[0];
//Bind parameters and add two rows to the table.
DataRow dr = dt.NewRow();
dr["product_key"] = 838929;
dr["product_version"] = 5;
dr["product_description"] = "New item 5";
dt.Rows.Add(dr);
dr = dt.NewRow();
dr["product_key"] = 838929;
dr["product_version"] = 6;
dr["product_description"] = "New item 6";
dt.Rows.Add(dr);
//Extract the changes for the added rows.
DataSet ds2 = ds.GetChanges();
//Send the modifications to the server.
int updateCount = da.Update(ds2, "product_dimension");
//Merge the changes into the original Data set, and mark it up to date.
ds.Merge(ds2);
ds.AcceptChanges();
Console.WriteLine(updateCount + " updates made!");
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
_conn.Close();
}
}
}
2 - 使用批量插入和预定义的语句
可以使用带有参数的预定义的语句批量加载数据。如果遇到任何错误,您还可以使用事务回退批量加载。
如果要加载大批量数据(超过 100 MB),请考虑使用直接批量插入。
以下示例详细介绍使用包含在数组中的数据以及参数和事务来批量加载数据。
可以通过以下命令创建在此示例中使用的 test 表:
=> CREATE TABLE test (id INT, username VARCHAR(24), email VARCHAR(64), password VARCHAR(8));
使用参数和事务的示例批量插入
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using Vertica.Data.VerticaClient;
namespace ConsoleApplication
{
class Program
{
static void Main(string[] args)
{
VerticaConnectionStringBuilder builder = new VerticaConnectionStringBuilder();
builder.Host = "192.168.1.10";
builder.Database = "VMart";
builder.User = "dbadmin";
VerticaConnection _conn = new VerticaConnection(builder.ToString());
_conn.Open();
// Create arrays for column data
int[] ids = {1, 2, 3, 4};
string[] usernames = {"user1", "user2", "user3", "user4"};
string[] emails = { "user1@example.com", "user2@example.com","user3@example.com","user4@example.com" };
string[] passwords = { "pass1", "pass2", "pass3", "pass4" };
// create counters for accepted and rejected rows
int rows = 0;
int rejRows = 0;
bool error = false;
// Create the transaction
VerticaTransaction txn = _conn.BeginTransaction();
// Create the parameterized query and assign parameter types
VerticaCommand command = _conn.CreateCommand();
command.CommandText = "insert into TEST values (@id, @username, @email, @password)";
command.Parameters.Add(new VerticaParameter("id", VerticaType.BigInt));
command.Parameters.Add(new VerticaParameter("username", VerticaType.VarChar));
command.Parameters.Add(new VerticaParameter("email", VerticaType.VarChar));
command.Parameters.Add(new VerticaParameter("password", VerticaType.VarChar));
// Prepare the statement
command.Prepare();
// Loop through the column arrays and insert the data
for (int i = 0; i < ids.Length; i++) {
command.Parameters["id"].Value = ids[i];
command.Parameters["username"].Value = usernames[i];
command.Parameters["email"].Value = emails[i];
command.Parameters["password"].Value = passwords[i];
try
{
rows += command.ExecuteNonQuery();
}
catch (Exception e)
{
Console.WriteLine("\nInsert failed - \n " + e.Message + "\n");
++rejRows;
error = true;
}
}
if (error)
{
// Roll back if errors
Console.WriteLine("Errors. Rolling Back Transaction.");
Console.WriteLine(rejRows + " rows rejected.");
txn.Rollback();
}
else
{
// Commit if no errors
Console.WriteLine("No Errors. Committing Transaction.");
txn.Commit();
Console.WriteLine("Inserted " + rows + " rows. ");
}
_conn.Close();
}
}
}
3 - 通过 ADO.NET 进行流式数据传输
以下两个选项可用于通过 ADO.NET 将客户端上的文件中的数据以流式传输到 Vertica 数据库:
-
使用
VerticaCopyStream
ADO.NET 类以面向对象的方式进行流式数据传输 -
执行 COPY LOCAL SQL 语句以进行流式数据传输
此部分中的主题介绍了使用这些选项的方法。
3.1 - 通过 VerticaCopyStream 从客户端进行流式传输
使用 VerticaCopyStream
类可以将数据从客户端系统流式传输到 Vertica 数据库。通过此类,您可以直接使用 SQL COPY 语句,而不必通过将 STDIN 替换为一个或多个数据流先将数据复制到数据库群集中的主机。
注意:
-
使用事务并对 copy 命令禁用自动提交可提高性能。
-
可以通过将 copy 命令与“no commit”修饰符结合使用来禁用自动提交。您必须显式禁用提交。使用 VerticaCopyStream 时,启用事务不会禁用自动提交。
-
与 VerticaCopyStream 结合使用的 copy 命令使用 copy 语法。
-
每次调用 execute 时,都会将 VerticaCopyStream.rejects 置零。如果要捕获拒绝数,请将 VerticaCopyStream.rejects 的值分配给另一个变量,然后再次调用 execute。
-
可以使用 AddStream() 调用添加多个流。
示例用法:
以下示例演示了使用 VerticaCopyStream 将文件流复制到 Vertica。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.IO;
using Vertica.Data.VerticaClient;
namespace ConsoleApplication
{
class Program
{
static void Main(string[] args)
{
// Configure connection properties
VerticaConnectionStringBuilder builder = new VerticaConnectionStringBuilder();
builder.Host = "192.168.1.10";
builder.Database = "VMart";
builder.User = "dbadmin";
//open the connection
VerticaConnection _conn = new VerticaConnection(builder.ToString());
_conn.Open();
try
{
using (_conn)
{
// Start a transaction
VerticaTransaction txn = _conn.BeginTransaction();
// Create a table for this example
VerticaCommand command = new VerticaCommand("DROP TABLE IF EXISTS copy_table", _conn);
command.ExecuteNonQuery();
command.CommandText = "CREATE TABLE copy_table (Last_Name char(50), "
+ "First_Name char(50),Email char(50), "
+ "Phone_Number char(15))";
command.ExecuteNonQuery();
// Create a new filestream from the data file
string filename = "C:/customers.txt";
Console.WriteLine("\n\nLoading File: " + filename);
FileStream inputfile = File.OpenRead(filename);
// Define the copy command
string copy = "copy copy_table from stdin record terminator E'\n' delimiter '|'" + " enforcelength "
+ " no commit";
// Create a new copy stream instance with the connection and copy statement
VerticaCopyStream vcs = new VerticaCopyStream(_conn, copy);
// Start the VerticaCopyStream process
vcs.Start();
// Add the file stream
vcs.AddStream(inputfile, false);
// Execute the copy
vcs.Execute();
// Finish stream and write out the list of inserted and rejected rows
long rowsInserted = vcs.Finish();
IList<long> rowsRejected = vcs.Rejects;
// Does not work when rejected or exceptions defined
Console.WriteLine("Number of Rows inserted: " + rowsInserted);
Console.WriteLine("Number of Rows rejected: " + rowsRejected.Count);
if (rowsRejected.Count > 0)
{
for (int i = 0; i < rowsRejected.Count; i++)
{
Console.WriteLine("Rejected row #{0} is row {1}", i, rowsRejected[i]);
}
}
// Commit the changes
txn.Commit();
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
//close the connection
_conn.Close();
}
}
}
3.2 - 将复制功能与 ADO.NET 配合使用
若要将 COPY 与 ADO.NET 结合使用,只需执行 COPY 语句并指定指向客户端系统上的源文件的路径即可。此方法比使用 VerticaCopyStream 类更简单。但是,如果有许多文件要复制到数据库,或者如果数据来自本地文件以外的其他源(例如,通过网络连接进行流式传输),您可能倾向于使用 VerticaCopyStream。
以下示例代码演示了使用 COPY 将文件从客户端复制到数据库。此示例与“使用 COPY 语句进行批量加载”中所示的代码相同,并且数据文件的路径位于客户端系统而非服务器上。
若要加载存储在数据库节点上的数据,请使用 VerticaCommand 对象创建 COPY 命令:
-
通过数据文件存储在的节点创建与数据库的连接。
-
使用连接创建命令对象。
VerticaCommand command = _conn.CreateCommand();
-
复制数据。以下是使用 COPY 命令加载数据的示例。此示例通过使用 LOCAL 修饰符发出命令来复制位于客户端本地的文件。
command.CommandText = "copy lcopy_table from '/home/dbadmin/customers.txt'" + " record terminator E'\n' delimiter '|'" + " enforcelength "; Int32 insertedRows = command.ExecuteNonQuery(); Console.WriteLine(insertedRows + " inserted.");
示例用法:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using System.IO;
using Vertica.Data.VerticaClient;
namespace ConsoleApplication
{
class Program
{
static void Main(string[] args)
{
// Configure connection properties
VerticaConnectionStringBuilder builder = new VerticaConnectionStringBuilder();
builder.Host = "192.168.1.10";
builder.Database = "VMart";
builder.User = "dbadmin";
// Open the connection
VerticaConnection _conn = new VerticaConnection(builder.ToString());
_conn.Open();
try
{
using (_conn)
{
// Start a transaction
VerticaTransaction txn = _conn.BeginTransaction();
// Create a table for this example
VerticaCommand command = new VerticaCommand("DROP TABLE IF EXISTS lcopy_table", _conn);
command.ExecuteNonQuery();
command.CommandText = "CREATE TABLE IF NOT EXISTS lcopy_table (Last_Name char(50), "
+ "First_Name char(50),Email char(50), "
+ "Phone_Number char(15))";
command.ExecuteNonQuery();
// Define the copy command
command.CommandText = "copy lcopy_table from '/home/dbadmin/customers.txt'"
+ " record terminator E'\n' delimiter '|'"
+ " enforcelength "
+ " no commit";
// Execute the copy
Int32 insertedRows = command.ExecuteNonQuery();
Console.WriteLine(insertedRows + " inserted.");
// Commit the changes
txn.Commit();
}
}
catch (Exception e)
{
Console.WriteLine("Exception: " + e.Message);
}
// Close the connection
_conn.Close();
}
}
}