可以使用以下任意方法通过 JDBC 接口加载数据:
-
执行 SQL INSERT 语句以直接插入单个行。
-
使用预定义的语句批量加载数据。
-
使用 COPY 批量加载文件或流中的数据。
以下几节详细介绍如何使用 JDBC 加载数据。
可以使用以下任意方法通过 JDBC 接口加载数据:
执行 SQL INSERT 语句以直接插入单个行。
使用预定义的语句批量加载数据。
使用 COPY 批量加载文件或流中的数据。
以下几节详细介绍如何使用 JDBC 加载数据。
将数据插入到表中的最简单方法是使用 SQL INSERT 语句。可以将 Statement 类的成员实例化以使用此语句,并使用其 executeUpdate() 方法以运行 SQL 语句。
以下代码片段演示了如何创建 Statement 对象并使用该对象将数据插入到名为 address_book 的表中:
Statement stmt = conn.createStatement();
stmt.executeUpdate("INSERT INTO address_book " +
"VALUES ('Smith', 'John', 'jsmith@example.com', " +
"'555-123-4567')");
此方法有几个缺点:您需要将数据转换为字符串并对数据中的特殊字符进行转义。插入数据的更好方法是使用预定义的语句。请参阅使用 JDBC 预定义的语句批量插入。
可以使用预定义的 INSERT 语句(仅需设置一次即可重复调用的服务器端语句)将数据批量加载到 Vertica 中。您可以使用包含表示数据的问号占位符的 SQL 语句将 PreparedStatement 类的成员实例化。例如:
PreparedStatement pstmt = conn.prepareStatement(
"INSERT INTO customers(last, first, id) VALUES(?,?,?)");
然后,可以对 PreparedStatement 对象使用特定于数据类型的方法(例如 setString() 和 setInt())来设置参数。设置参数后,调用 addBatch() 方法以将行添加到批中。整个数据批准备就绪后,调用 executeBatch() 方法以执行批量插入。
在后台,批量插入会转换为 COPY 语句。如果已禁用连接的 AutoCommit 参数,则 Vertica 会保持打开 COPY 语句并使用该语句加载后续的批,直至事务已提交或者游标已关闭或应用程序执行任何其他操作(或者使用其他 Statement 或 PreparedStatement 对象执行任何语句)为止。使用单个 COPY 语句进行多个批量插入可以更高效地加载数据。如果要加载多个批,应禁用数据库的 AutoCommit 属性以提高效率。
执行批量插入时,可以试验各个批大小和行大小以确定可提供最佳性能的设置。
以下示例演示了使用预定义的语句批量插入数据。
import java.sql.*;
import java.util.Properties;
public class BatchInsertExample {
public static void main(String[] args) {
Properties myProp = new Properties();
myProp.put("user", "ExampleUser");
myProp.put("password", "password123");
//Set streamingBatchInsert to True to enable streaming mode for batch inserts.
//myProp.put("streamingBatchInsert", "True");
Connection conn;
try {
conn = DriverManager.getConnection(
"jdbc:vertica://VerticaHost:5433/ExampleDB",
myProp);
// establish connection and make a table for the data.
Statement stmt = conn.createStatement();
// Set AutoCommit to false to allow Vertica to reuse the same
// COPY statement
conn.setAutoCommit(false);
// Drop table and recreate.
stmt.execute("DROP TABLE IF EXISTS customers CASCADE");
stmt.execute("CREATE TABLE customers (CustID int, Last_Name"
+ " char(50), First_Name char(50),Email char(50), "
+ "Phone_Number char(12))");
// Some dummy data to insert.
String[] firstNames = new String[] { "Anna", "Bill", "Cindy",
"Don", "Eric" };
String[] lastNames = new String[] { "Allen", "Brown", "Chu",
"Dodd", "Estavez" };
String[] emails = new String[] { "aang@example.com",
"b.brown@example.com", "cindy@example.com",
"d.d@example.com", "e.estavez@example.com" };
String[] phoneNumbers = new String[] { "123-456-7890",
"555-444-3333", "555-867-5309",
"555-555-1212", "781-555-0000" };
// Create the prepared statement
PreparedStatement pstmt = conn.prepareStatement(
"INSERT INTO customers (CustID, Last_Name, " +
"First_Name, Email, Phone_Number)" +
" VALUES(?,?,?,?,?)");
// Add rows to a batch in a loop. Each iteration adds a
// new row.
for (int i = 0; i < firstNames.length; i++) {
// Add each parameter to the row.
pstmt.setInt(1, i + 1);
pstmt.setString(2, lastNames[i]);
pstmt.setString(3, firstNames[i]);
pstmt.setString(4, emails[i]);
pstmt.setString(5, phoneNumbers[i]);
// Add row to the batch.
pstmt.addBatch();
}
try {
// Batch is ready, execute it to insert the data
pstmt.executeBatch();
} catch (SQLException e) {
System.out.println("Error message: " + e.getMessage());
return; // Exit if there was an error
}
// Commit the transaction to close the COPY command
conn.commit();
// Print the resulting table.
ResultSet rs = null;
rs = stmt.executeQuery("SELECT CustID, First_Name, "
+ "Last_Name FROM customers ORDER BY CustID");
while (rs.next()) {
System.out.println(rs.getInt(1) + " - "
+ rs.getString(2).trim() + " "
+ rs.getString(3).trim());
}
// Cleanup
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
运行此示例代码后的结果如下所示:
1 - Anna Allen
2 - Bill Brown
3 - Cindy Chu
4 - Don Dodd
5 - Eric Estavez
默认情况下,Vertica 通过缓存每个行并在用户调用 executeBatch() 方法时插入缓存来执行批量插入。Vertica 也支持流式批量插入。流式批量插入在用户每次调用 addBatch() 时将行添加到数据库中。流式批量插入可提高数据库性能,因为它能够进行并行处理并降低内存需求。
若要启用流式批量插入,请将 streamingBatchInsert 属性设置为 True。以上代码示例包含一个行,该行启用了 streamingBatchInsert 模式。移除 // 注释标记可启用该行并激活流式批量插入。
下表介绍了各种批量插入方法及其默认批量插入模式和流式批量插入模式之间的行为差异。
使用 PreparedStatement.setFloat() 方法会导致出现舍入误差。如果精度很重要,请改为使用 .setDouble() 方法。
预定义语句时,PreparedStatement 对象会缓存连接的 AutoCommit 属性。以后对 AutoCommit 属性进行的更改不会影响预定义的语句。
加载各个批时,您可以确定已接受的行数和已拒绝了哪些行(有关详细信息,请参阅确定已接受的行和已拒绝的行)。如果禁用了 AutoCommit 连接设置,则插入各个批时不会发生其他错误(例如磁盘空间错误)。此行为是由单个 SQL COPY 语句对多个连续批执行加载(这样可提高加载过程的效率)导致的。只有 COPY 语句关闭时,才会提交批量数据,而且 Vertica 会报告其他类型错误。
因此,应使批量加载应用程序准备好在 COPY 语句关闭时检查错误。可以通过以下方法促使 COPY 语句关闭:
通过调用 Connection.commit() 来结束批量加载事务
通过使用 Statement.close() 来关闭该语句
在加载中插入最后一批之前将连接的 AutoCommit 属性设置为 true
Statement 或 PreparedStatement 对象执行任何语句,COPY 语句也会关闭。使用以上任一方法结束 COPY 语句会造成混乱并导致更难以维护应用程序,因为您需要处理非批量加载语句中的批量加载错误。您应当在批量加载结束时显式终止 COPY 语句,同时处理任何错误。
PreparedStatement.executeBatch 的返回值是一个整数数组,其中包含每个行的插入操作的成功或失败状态。值 1 表示行已被接受,而值 -3 表示行已被拒绝。如果在批处理执行期间出现了异常,您还可以使用 BatchUpdateException.getUpdateCounts() 获取该数组。
以下示例延伸了 使用 JDBC 预定义的语句批量插入 中所示的示例,以检索该数组并显示批量加载的结果。
import java.sql.*;
import java.util.Arrays;
import java.util.Properties;
public class BatchInsertErrorHandlingExample {
public static void main(String[] args) {
Properties myProp = new Properties();
myProp.put("user", "ExampleUser");
myProp.put("password", "password123");
Connection conn;
// establish connection and make a table for the data.
try {
conn = DriverManager.getConnection(
"jdbc:vertica://VerticaHost:5433/ExampleDB",
myProp);
// Disable auto commit
conn.setAutoCommit(false);
// Create a statement
Statement stmt = conn.createStatement();
// Drop table and recreate.
stmt.execute("DROP TABLE IF EXISTS customers CASCADE");
stmt.execute("CREATE TABLE customers (CustID int, Last_Name"
+ " char(50), First_Name char(50),Email char(50), "
+ "Phone_Number char(12))");
// Some dummy data to insert. The one row won't insert because
// the phone number is too long for the phone column.
String[] firstNames = new String[] { "Anna", "Bill", "Cindy",
"Don", "Eric" };
String[] lastNames = new String[] { "Allen", "Brown", "Chu",
"Dodd", "Estavez" };
String[] emails = new String[] { "aang@example.com",
"b.brown@example.com", "cindy@example.com",
"d.d@example.com", "e.estavez@example.com" };
String[] phoneNumbers = new String[] { "123-456-789",
"555-444-3333", "555-867-53093453453",
"555-555-1212", "781-555-0000" };
// Create the prepared statement
PreparedStatement pstmt = conn.prepareStatement(
"INSERT INTO customers (CustID, Last_Name, " +
"First_Name, Email, Phone_Number)" +
" VALUES(?,?,?,?,?)");
// Add rows to a batch in a loop. Each iteration adds a
// new row.
for (int i = 0; i < firstNames.length; i++) {
// Add each parameter to the row.
pstmt.setInt(1, i + 1);
pstmt.setString(2, lastNames[i]);
pstmt.setString(3, firstNames[i]);
pstmt.setString(4, emails[i]);
pstmt.setString(5, phoneNumbers[i]);
// Add row to the batch.
pstmt.addBatch();
}
// Integer array to hold the results of inserting
// the batch. Will contain an entry for each row,
// indicating success or failure.
int[] batchResults = null;
try {
// Batch is ready, execute it to insert the data
batchResults = pstmt.executeBatch();
} catch (BatchUpdateException e) {
// We expect an exception here, since one of the
// inserted phone numbers is too wide for its column. All of the
// rest of the rows will be inserted.
System.out.println("Error message: " + e.getMessage());
// Batch results isn't set due to exception, but you
// can get it from the exception object.
//
// In your own code, you shouldn't assume the a batch
// exception occurred, since exceptions can be thrown
// by the server for a variety of reasons.
batchResults = e.getUpdateCounts();
}
// You should also be prepared to catch SQLExceptions in your own
// application code, to handle dropped connections and other general
// problems.
// Commit the transaction
conn.commit();
// Print the array holding the results of the batch insertions.
System.out.println("Return value from inserting batch: "
+ Arrays.toString(batchResults));
// Print the resulting table.
ResultSet rs = null;
rs = stmt.executeQuery("SELECT CustID, First_Name, "
+ "Last_Name FROM customers ORDER BY CustID");
while (rs.next()) {
System.out.println(rs.getInt(1) + " - "
+ rs.getString(2).trim() + " "
+ rs.getString(3).trim());
}
// Cleanup
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
运行以上示例后,将在控制台上生成以下输出:
Error message: [Vertica][VJDBC](100172) One or more rows were rejected by the server.Return value from inserting batch: [1, 1, -3, 1, 1]
1 - Anna Allen
2 - Bill Brown
4 - Don Dodd
5 - Eric Estavez
请注意,第三行插入失败,因为其电话号码对 Phone_Number 列来说太长。该批中其余的所有行(包括该错误之后的行)已正确插入。
即使一个或多个行被拒绝,批量加载也始终插入所有数据。只有某个批中导致发生错误的行不会加载。如果数据库连接的 AutoCommit 属性为 true,各个批会在完成后自动提交其事务,因此在该批完成加载后,将提交数据。
在某些情况下,您可能希望成功插入某个批中的所有数据(在发生错误时不应提交任何数据)。若要实现此目的,最佳方法是关闭数据库连接的 AutoCommit 属性以防止各个批自动提交其自身。然后,如果某个批遇到错误,您可以在捕获由插入错误导致的 BatchUpdateException 之后回退事务。
以下示例演示了在加载某个批期间发生任何错误时执行回退。
import java.sql.*;
import java.util.Arrays;
import java.util.Properties;
public class RollbackBatchOnError {
public static void main(String[] args) {
Properties myProp = new Properties();
myProp.put("user", "ExampleUser");
myProp.put("password", "password123");
Connection conn;
try {
conn = DriverManager.getConnection(
"jdbc:vertica://VerticaHost:5433/ExampleDB",
myProp);
// Disable auto-commit. This will allow you to roll back a
// a batch load if there is an error.
conn.setAutoCommit(false);
// establish connection and make a table for the data.
Statement stmt = conn.createStatement();
// Drop table and recreate.
stmt.execute("DROP TABLE IF EXISTS customers CASCADE");
stmt.execute("CREATE TABLE customers (CustID int, Last_Name"
+ " char(50), First_Name char(50),Email char(50), "
+ "Phone_Number char(12))");
// Some dummy data to insert. The one row won't insert because
// the phone number is too long for the phone column.
String[] firstNames = new String[] { "Anna", "Bill", "Cindy",
"Don", "Eric" };
String[] lastNames = new String[] { "Allen", "Brown", "Chu",
"Dodd", "Estavez" };
String[] emails = new String[] { "aang@example.com",
"b.brown@example.com", "cindy@example.com",
"d.d@example.com", "e.estavez@example.com" };
String[] phoneNumbers = new String[] { "123-456-789",
"555-444-3333", "555-867-53094535", "555-555-1212",
"781-555-0000" };
// Create the prepared statement
PreparedStatement pstmt = conn.prepareStatement(
"INSERT INTO customers (CustID, Last_Name, " +
"First_Name, Email, Phone_Number) "+
"VALUES(?,?,?,?,?)");
// Add rows to a batch in a loop. Each iteration adds a
// new row.
for (int i = 0; i < firstNames.length; i++) {
// Add each parameter to the row.
pstmt.setInt(1, i + 1);
pstmt.setString(2, lastNames[i]);
pstmt.setString(3, firstNames[i]);
pstmt.setString(4, emails[i]);
pstmt.setString(5, phoneNumbers[i]);
// Add row to the batch.
pstmt.addBatch();
}
// Integer array to hold the results of inserting
// the batch. Will contain an entry for each row,
// indicating success or failure.
int[] batchResults = null;
try {
// Batch is ready, execute it to insert the data
batchResults = pstmt.executeBatch();
// If we reach here, we inserted the batch without errors.
// Commit it.
System.out.println("Batch insert successful. Committing.");
conn.commit();
} catch (BatchUpdateException e) {
System.out.println("Error message: " + e.getMessage());
// Batch results isn't set due to exception, but you
// can get it from the exception object.
batchResults = e.getUpdateCounts();
// Roll back the batch transaction.
System.out.println("Rolling back batch insertion");
conn.rollback();
}
catch (SQLException e) {
// General SQL errors, such as connection issues, throw
// SQLExceptions. Your application should do something more
// than just print a stack trace,
e.printStackTrace();
}
System.out.println("Return value from inserting batch: "
+ Arrays.toString(batchResults));
System.out.println("Customers table contains:");
// Print the resulting table.
ResultSet rs = null;
rs = stmt.executeQuery("SELECT CustID, First_Name, "
+ "Last_Name FROM customers ORDER BY CustID");
while (rs.next()) {
System.out.println(rs.getInt(1) + " - "
+ rs.getString(2).trim() + " "
+ rs.getString(3).trim());
}
// Cleanup
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
运行以上示例后,将在系统控制台上输出以下内容:
Error message: [Vertica][VJDBC](100172) One or more rows were rejected by the server.Rolling back batch insertion
Return value from inserting batch: [1, 1, -3, 1, 1]
Customers table contains:
返回值指示是否已成功插入每个行。值 1 表示已插入该行并且未出现任何问题,而值 -3 表示插入该行失败。
客户表为空,因为已回退该批量插入,原因是第三列导致发生错误。
一次将大量数据加载(批量加载)到 Vertica 的最快方法之一是使用 COPY 语句。此语句可将存储在 Vertica 主机上的文件中的数据(或数据流中的数据)加载到数据库中的表。可以向 COPY 语句传递定义了以下设置的参数:文件中的数据的格式、加载数据时对数据的转换方式、对错误的处理方式以及加载数据的方式。有关详细信息,请参阅 COPY 文档。
在 Vertica ≤ 9.2 版本中创建的数据库中,COPY 支持 DIRECT 选项,该选项指定将数据直接加载到
ROS 而不是 WOS。将大型 (>100MB) 文件加载到数据库时使用此选项;否则,负载可能会填满 WOS。发生这种情况时, Tuple Mover 必须对 WOS 数据执行 移出 操作。直接加载到 ROS 更高效,并且可避免强制执行 moveout。
在 Vertica 9.3 中创建的数据库中,Vertica 忽略加载选项和提示,并始终使用 DIRECT 加载方法。≥ 10.0 版本创建的数据库不再支持 WOS 和移出操作;所有数据总是直接加载到 ROS 中。
只有 超级用户可以使用 COPY 语句复制存储在主机上的文件,因此您必须使用超级用户帐户连接到数据库。如果要以非超级用户身份批量加载数据,您可以使用 COPY 从主机上的流(例如 STDIN)而非文件加载数据,或者也可以从客户端以流式传输数据(请参阅通过 JDBC 进行流式数据传输)。您还可以执行使用预定义的语句的批量插入(此为标准方式),这种方式在后台使用 COPY 语句来加载数据。
ON ANY NODE 时,请确认所有节点上的源文件均相同。使用不同的文件会导致生成的结果不一致。
以下示例演示了通过 JDBC 使用 COPY 语句将名为 customers.txt 的文件加载到新的数据库表。该文件必须存储在应用程序连接到的数据库主机(在此示例中是名为 VerticaHost 的主机)上。
import java.sql.*;
import java.util.Properties;
import com.vertica.jdbc.*;
public class COPYFromFile {
public static void main(String[] args) {
Properties myProp = new Properties();
myProp.put("user", "ExampleAdmin"); // Must be superuser
myProp.put("password", "password123");
Connection conn;
try {
conn = DriverManager.getConnection(
"jdbc:vertica://VerticaHost:5433/ExampleDB",myProp);
// Disable AutoCommit
conn.setAutoCommit(false);
Statement stmt = conn.createStatement();
// Create a table to hold data.
stmt.execute("DROP TABLE IF EXISTS customers;");
stmt.execute("CREATE TABLE IF NOT EXISTS customers (Last_Name char(50) "
+ "NOT NULL, First_Name char(50),Email char(50), "
+ "Phone_Number char(15))");
// Use the COPY command to load data. Use ENFORCELENGTH to reject
// strings too wide for their columns.
boolean result = stmt.execute("COPY customers FROM "
+ " '/data/customers.txt' ENFORCELENGTH");
// Determine if execution returned a count value, or a full result
// set.
if (result) {
System.out.println("Got result set");
} else {
// Count will usually return the count of rows inserted.
System.out.println("Got count");
int rowCount = stmt.getUpdateCount();
System.out.println("Number of accepted rows = " + rowCount);
}
// Commit the data load
conn.commit();
} catch (SQLException e) {
System.out.print("Error: ");
System.out.println(e.toString());
}
}
}
运行此示例后,将在系统控制台上输出以下内容(假设 customers.txt 文件两百万个有效的行):
Number of accepted rows = 2000000
以下两个选项可用于将客户端上的文件中的数据以流式传输到 Vertica 数据库:
使用 VerticaCopyStream 类按照面向对象的方式流式传输数据 - 有关该类的详细信息,请参阅 JDBC 文档
执行 COPY LOCAL SQL 语句以进行流式数据传输
此部分中的主题介绍了使用这些选项的方法。
使用 VerticaCopyStream 类可以将数据从客户端系统流式传输到 Vertica 数据库。它允许您直接使用 COPY,而无需先将数据复制到数据库群集中的主机。使用 COPY 命令从主机加载数据时,需要拥有超级用户权限才能访问主机的文件系统。用于从流加载数据的 COPY 语句不需要超级用户权限,因此客户端可以使用对目标表拥有 INSERT 权限的任何用户帐户进行连接。
若要将流复制到数据库,请执行下列操作:
禁用数据库连接的 AutoCommit 连接参数。
将 VerticaCopyStreamObject 实例化,并至少向其传递数据库连接对象和包含用于加载数据的 COPY 语句的字符串。该语句必须将 STDIN 中的数据复制到表。您可以使用适用于数据加载的任何参数。
VerticaCopyStreamObject 构造函数可以选择使用单个 InputStream 对象或 InputStream 对象的 List。此选项可让您预填充要复制到数据库中的流的列表。
调用 VerticaCopyStreamObject.start(),以启动 COPY 语句并开始以流式传输已添加到 VerticaCopyStreamObject 的任何流中的数据。
调用 VerticaCopyStreamObject.addStream(),以将其他流添加到要发送到数据库的流的列表。然后,可以调用 VerticaCopyStreamObject.execute() 以将它们以流式传输到服务器。
(可选)调用 VerticaCopyStreamObject.getRejects() 以从上一次 .execute() 调用获取拒绝的行的列表。对 .execute() 或 .finish() 的每次调用会重置拒绝列表。
VerticaCopyStreamObject 对象的 COPY 语句中使用了 REJECTED DATA 或 EXCEPTIONS 选项,.getRejects() 将返回空列表。一次只能使用一种方法来跟踪拒绝的行。
完成添加流后,调用 VerticaCopyStreamObject.finish() 以将其余任何流发送到数据库并关闭 COPY 语句。
调用 Connection.commit() 以提交已加载的数据。
VerticaCopyStreamObject.getRejects() 方法将返回一个列表,其中包含上一次 .execute() 方法调用之后拒绝的行的数量。对 .execute() 的每次调用会清除拒绝的行的列表,因此您需要在每次调用 .execute() 之后调用 .getRejects()。由于 .start() 和 .finish() 也会调用 .execute() 以将任何挂起的流发送到服务器,因此您还应在调用这些方法之后调用 .getRejects()。
以下示例演示了将存储在客户端系统上的五个文本文件的内容加载到表中。
import java.io.File;
import java.io.FileInputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import com.vertica.jdbc.VerticaConnection;
import com.vertica.jdbc.VerticaCopyStream;
public class CopyMultipleStreamsExample {
public static void main(String[] args) {
// Note: If running on Java 5, you need to call Class.forName
// to manually load the JDBC driver.
// Set up the properties of the connection
Properties myProp = new Properties();
myProp.put("user", "ExampleUser"); // Must be superuser
myProp.put("password", "password123");
// When performing bulk loads, you should always disable the
// connection's AutoCommit property to ensure the loads happen as
// efficiently as possible by reusing the same COPY command and
// transaction.
myProp.put("AutoCommit", "false");
Connection conn;
try {
conn = DriverManager.getConnection(
"jdbc:vertica://VerticaHost:5433/ExampleDB", myProp);
Statement stmt = conn.createStatement();
// Create a table to receive the data
stmt.execute("DROP TABLE IF EXISTS customers");
stmt.execute("CREATE TABLE customers (Last_Name char(50), "
+ "First_Name char(50),Email char(50), "
+ "Phone_Number char(15))");
// Prepare the query to insert from a stream. This query must use
// the COPY statement to load data from STDIN. Unlike copying from
// a file on the host, you do not need superuser privileges to
// copy a stream. All your user account needs is INSERT privileges
// on the target table.
String copyQuery = "COPY customers FROM STDIN "
+ "DELIMITER '|' ENFORCELENGTH";
// Create an instance of the stream class. Pass in the
// connection and the query string.
VerticaCopyStream stream = new VerticaCopyStream(
(VerticaConnection) conn, copyQuery);
// Keep running count of the number of rejects
int totalRejects = 0;
// start() starts the stream process, and opens the COPY command.
stream.start();
// If you added streams to VerticaCopyStream before calling start(),
// You should check for rejects here (see below). The start() method
// calls execute() to send any pre-queued streams to the server
// once the COPY statement has been created.
// Simple for loop to load 5 text files named customers-1.txt to
// customers-5.txt
for (int loadNum = 1; loadNum <= 5; loadNum++) {
// Prepare the input file stream. Read from a local file.
String filename = "C:\\Data\\customers-" + loadNum + ".txt";
System.out.println("\n\nLoading file: " + filename);
File inputFile = new File(filename);
FileInputStream inputStream = new FileInputStream(inputFile);
// Add stream to the VerticaCopyStream
stream.addStream(inputStream);
// call execute() to load the newly added stream. You could
// add many streams and call execute once to load them all.
// Which method you choose depends mainly on whether you want
// the ability to check the number of rejections as the load
// progresses so you can stop if the number of rejects gets too
// high. Also, high numbers of InputStreams could create a
// resource issue on your client system.
stream.execute();
// Show any rejects from this execution of the stream load
// getRejects() returns a List containing the
// row numbers of rejected rows.
List<Long> rejects = stream.getRejects();
// The size of the list gives you the number of rejected rows.
int numRejects = rejects.size();
totalRejects += numRejects;
System.out.println("Number of rows rejected in load #"
+ loadNum + ": " + numRejects);
// List all of the rows that were rejected.
Iterator<Long> rejit = rejects.iterator();
long linecount = 0;
while (rejit.hasNext()) {
System.out.print("Rejected row #" + ++linecount);
System.out.println(" is row " + rejit.next());
}
}
// Finish closes the COPY command. It returns the number of
// rows inserted.
long results = stream.finish();
System.out.println("Finish returned " + results);
// If you added any streams that hadn't been executed(),
// you should also check for rejects here, since finish()
// calls execute() to
// You can also get the number of rows inserted using
// getRowCount().
System.out.println("Number of rows accepted: "
+ stream.getRowCount());
System.out.println("Total number of rows rejected: " + totalRejects);
// Commit the loaded data
conn.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
}
对部分示例数据运行以上示例后,将生成以下输出:
Loading file: C:\Data\customers-1.txtNumber of rows rejected in load #1: 3
Rejected row #1 is row 3
Rejected row #2 is row 7
Rejected row #3 is row 51
Loading file: C:\Data\customers-2.txt
Number of rows rejected in load #2: 5Rejected row #1 is row 4143
Rejected row #2 is row 6132
Rejected row #3 is row 9998
Rejected row #4 is row 10000
Rejected row #5 is row 10050
Loading file: C:\Data\customers-3.txt
Number of rows rejected in load #3: 9
Rejected row #1 is row 14142
Rejected row #2 is row 16131
Rejected row #3 is row 19999
Rejected row #4 is row 20001
Rejected row #5 is row 20005
Rejected row #6 is row 20049
Rejected row #7 is row 20056
Rejected row #8 is row 20144
Rejected row #9 is row 20236
Loading file: C:\Data\customers-4.txt
Number of rows rejected in load #4: 8
Rejected row #1 is row 23774
Rejected row #2 is row 24141
Rejected row #3 is row 25906
Rejected row #4 is row 26130
Rejected row #5 is row 27317
Rejected row #6 is row 28121
Rejected row #7 is row 29321
Rejected row #8 is row 29998
Loading file: C:\Data\customers-5.txt
Number of rows rejected in load #5: 1
Rejected row #1 is row 39997
Finish returned 39995
Number of rows accepted: 39995
Total number of rows rejected: 26
若要将 COPY LOCAL 与 JDBC 结合使用,只需执行 COPY LOCAL 语句并指定客户端系统上源文件的路径即可。此方法比使用 VerticaCopyStream 类更简单(有关该类的详细信息,请参阅
JDBC 文档)。但是,如果有许多文件要复制到数据库,或者如果数据来自某个文件以外的其他源(例如,通过网络连接进行流式传输),您可能倾向于使用 VerticaCopyStream。
可以在多语句查询中使用 COPY LOCAL。但是,您应当始终将其作为查询中的第一条语句。不应当在同一个查询中多次使用它。
以下示例代码演示了使用 COPY LOCAL 将文件从客户端复制到数据库。除了在 COPY 语句中使用 LOCAL 选项,并且数据文件的路径位于客户端系统而非服务器上,此示例与使用 COPY 语句进行批量加载中所示的代码相同。
import java.sql.*;
import java.util.Properties;
public class COPYLocal {
public static void main(String[] args) {
// Note: If using Java 5, you must call Class.forName to load the
// JDBC driver.
Properties myProp = new Properties();
myProp.put("user", "ExampleUser"); // Do not need to superuser
myProp.put("password", "password123");
Connection conn;
try {
conn = DriverManager.getConnection(
"jdbc:vertica://VerticaHost:5433/ExampleDB",myProp);
// Disable AutoCommit
conn.setAutoCommit(false);
Statement stmt = conn.createStatement();
// Create a table to hold data.
stmt.execute("DROP TABLE IF EXISTS customers;");
stmt.execute("CREATE TABLE IF NOT EXISTS customers (Last_Name char(50) "
+ "NOT NULL, First_Name char(50),Email char(50), "
+ "Phone_Number char(15))");
// Use the COPY command to load data. Load directly into ROS, since
// this load could be over 100MB. Use ENFORCELENGTH to reject
// strings too wide for their columns.
boolean result = stmt.execute("COPY customers FROM LOCAL "
+ " 'C:\\Data\\customers.txt' DIRECT ENFORCELENGTH");
// Determine if execution returned a count value, or a full result
// set.
if (result) {
System.out.println("Got result set");
} else {
// Count will usually return the count of rows inserted.
System.out.println("Got count");
int rowCount = stmt.getUpdateCount();
System.out.println("Number of accepted rows = " + rowCount);
}
conn.close();
} catch (SQLException e) {
System.out.print("Error: ");
System.out.println(e.toString());
}
}
}
运行此代码后的结果如下所示。在此示例中,customers.txt 文件包含 10000 行,其中七个行被拒绝,因为这些行所包含的数据的宽度太大而无法适应其数据库列。
Got countNumber of accepted rows = 9993