这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

使用 JDBC 预定义的语句批量插入

可以使用预定义的 INSERT 语句(仅需设置一次即可重复调用的服务器端语句)将数据批量加载到 Vertica 中。您可以使用包含表示数据的问号占位符的 SQL 语句将 PreparedStatement 类的成员实例化。例如:

PreparedStatement pstmt = conn.prepareStatement(
                    "INSERT INTO customers(last, first, id) VALUES(?,?,?)");

然后,可以对 PreparedStatement 对象使用特定于数据类型的方法(例如 setString()setInt())来设置参数。设置参数后,调用 addBatch() 方法以将行添加到批中。整个数据批准备就绪后,调用 executeBatch() 方法以执行批量插入。

在后台,批量插入会转换为 COPY 语句。如果已禁用连接的 AutoCommit 参数,则 Vertica 会保持打开 COPY 语句并使用该语句加载后续的批,直至事务已提交或者游标已关闭或应用程序执行任何其他操作(或者使用其他 StatementPreparedStatement 对象执行任何语句)为止。使用单个 COPY 语句进行多个批量插入可以更高效地加载数据。如果要加载多个批,应禁用数据库的 AutoCommit 属性以提高效率。

执行批量插入时,可以试验各个批大小和行大小以确定可提供最佳性能的设置。

以下示例演示了使用预定义的语句批量插入数据。

import java.sql.*;
import java.util.Properties;

public class BatchInsertExample {
    public static void main(String[] args) {
        Properties myProp = new Properties();
        myProp.put("user", "ExampleUser");
        myProp.put("password", "password123");

     //Set streamingBatchInsert to True to enable streaming mode for batch inserts.
     //myProp.put("streamingBatchInsert", "True");

     Connection conn;
        try {
            conn = DriverManager.getConnection(
                            "jdbc:vertica://VerticaHost:5433/ExampleDB",
                            myProp);
            // establish connection and make a table for the data.
            Statement stmt = conn.createStatement();


            // Set AutoCommit to false to allow Vertica to reuse the same
            // COPY statement
            conn.setAutoCommit(false);


            // Drop table and recreate.
            stmt.execute("DROP TABLE IF EXISTS customers CASCADE");
            stmt.execute("CREATE TABLE customers (CustID int, Last_Name"
                            + " char(50), First_Name char(50),Email char(50), "
                            + "Phone_Number char(12))");
            // Some dummy data to insert.
            String[] firstNames = new String[] { "Anna", "Bill", "Cindy",
                            "Don", "Eric" };
            String[] lastNames = new String[] { "Allen", "Brown", "Chu",
                            "Dodd", "Estavez" };
            String[] emails = new String[] { "aang@example.com",
                            "b.brown@example.com", "cindy@example.com",
                            "d.d@example.com", "e.estavez@example.com" };
            String[] phoneNumbers = new String[] { "123-456-7890",
                            "555-444-3333", "555-867-5309",
                            "555-555-1212", "781-555-0000" };
            // Create the prepared statement
            PreparedStatement pstmt = conn.prepareStatement(
                            "INSERT INTO customers (CustID, Last_Name, " +
                            "First_Name, Email, Phone_Number)" +
                            " VALUES(?,?,?,?,?)");
            // Add rows to a batch in a loop. Each iteration adds a
            // new row.
            for (int i = 0; i < firstNames.length; i++) {
                // Add each parameter to the row.
                pstmt.setInt(1, i + 1);
                pstmt.setString(2, lastNames[i]);
                pstmt.setString(3, firstNames[i]);
                pstmt.setString(4, emails[i]);
                pstmt.setString(5, phoneNumbers[i]);
                // Add row to the batch.
                pstmt.addBatch();
            }

            try {
                // Batch is ready, execute it to insert the data
                pstmt.executeBatch();
            } catch (SQLException e) {
                System.out.println("Error message: " + e.getMessage());
                return; // Exit if there was an error
            }

            // Commit the transaction to close the COPY command
            conn.commit();


            // Print the resulting table.
            ResultSet rs = null;
            rs = stmt.executeQuery("SELECT CustID, First_Name, "
                            + "Last_Name FROM customers ORDER BY CustID");
            while (rs.next()) {
                System.out.println(rs.getInt(1) + " - "
                                + rs.getString(2).trim() + " "
                                + rs.getString(3).trim());
            }
            // Cleanup
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

运行此示例代码后的结果如下所示:

1 - Anna Allen
2 - Bill Brown
3 - Cindy Chu
4 - Don Dodd
5 - Eric Estavez

流式批量插入

默认情况下,Vertica 通过缓存每个行并在用户调用 executeBatch() 方法时插入缓存来执行批量插入。Vertica 也支持流式批量插入。流式批量插入在用户每次调用 addBatch() 时将行添加到数据库中。流式批量插入可提高数据库性能,因为它能够进行并行处理并降低内存需求。

若要启用流式批量插入,请将 streamingBatchInsert 属性设置为 True。以上代码示例包含一个行,该行启用了 streamingBatchInsert 模式。移除 // 注释标记可启用该行并激活流式批量插入。

下表介绍了各种批量插入方法及其默认批量插入模式和流式批量插入模式之间的行为差异。

注意

  • 使用 PreparedStatement.setFloat() 方法会导致出现舍入误差。如果精度很重要,请改为使用 .setDouble() 方法。

  • 预定义语句时,PreparedStatement 对象会缓存连接的 AutoCommit 属性。以后对 AutoCommit 属性进行的更改不会影响预定义的语句。

1 - 批量加载过程中的错误处理

加载各个批时,您可以确定已接受的行数和已拒绝了哪些行(有关详细信息,请参阅确定已接受的行和已拒绝的行)。如果禁用了 AutoCommit 连接设置,则插入各个批时不会发生其他错误(例如磁盘空间错误)。此行为是由单个 SQL COPY 语句对多个连续批执行加载(这样可提高加载过程的效率)导致的。只有 COPY 语句关闭时,才会提交批量数据,而且 Vertica 会报告其他类型错误。

因此,应使批量加载应用程序准备好在 COPY 语句关闭时检查错误。可以通过以下方法促使 COPY 语句关闭:

  • 通过调用 Connection.commit() 来结束批量加载事务

  • 通过使用 Statement.close() 来关闭该语句

  • 在加载中插入最后一批之前将连接的 AutoCommit 属性设置为 true

2 - 确定已接受和已拒绝行 (JDBC)

PreparedStatement.executeBatch 的返回值是一个整数数组,其中包含每个行的插入操作的成功或失败状态。值 1 表示行已被接受,而值 -3 表示行已被拒绝。如果在批处理执行期间出现了异常,您还可以使用 BatchUpdateException.getUpdateCounts() 获取该数组。

以下示例延伸了 使用 JDBC 预定义的语句批量插入 中所示的示例,以检索该数组并显示批量加载的结果。

import java.sql.*;
import java.util.Arrays;
import java.util.Properties;

public class BatchInsertErrorHandlingExample {
    public static void main(String[] args) {
        Properties myProp = new Properties();
        myProp.put("user", "ExampleUser");
        myProp.put("password", "password123");
        Connection conn;

        // establish connection and make a table for the data.
        try {
            conn = DriverManager.getConnection(
                            "jdbc:vertica://VerticaHost:5433/ExampleDB",
                            myProp);


            // Disable auto commit
            conn.setAutoCommit(false);

            // Create a statement
            Statement stmt = conn.createStatement();
            // Drop table and recreate.
            stmt.execute("DROP TABLE IF EXISTS customers CASCADE");
            stmt.execute("CREATE TABLE customers (CustID int, Last_Name"
                            + " char(50), First_Name char(50),Email char(50), "
                            + "Phone_Number char(12))");

            // Some dummy data to insert. The one row won't insert because
            // the phone number is too long for the phone column.
            String[] firstNames = new String[] { "Anna", "Bill", "Cindy",
                            "Don", "Eric" };
            String[] lastNames = new String[] { "Allen", "Brown", "Chu",
                            "Dodd", "Estavez" };
            String[] emails = new String[] { "aang@example.com",
                            "b.brown@example.com", "cindy@example.com",
                            "d.d@example.com", "e.estavez@example.com" };
            String[] phoneNumbers = new String[] { "123-456-789",
                            "555-444-3333", "555-867-53093453453",
                            "555-555-1212", "781-555-0000" };

            // Create the prepared statement
            PreparedStatement pstmt = conn.prepareStatement(
                            "INSERT INTO customers (CustID, Last_Name, " +
                            "First_Name, Email, Phone_Number)" +
                            " VALUES(?,?,?,?,?)");

            // Add rows to a batch in a loop. Each iteration adds a
            // new row.
            for (int i = 0; i < firstNames.length; i++) {
                // Add each parameter to the row.
                pstmt.setInt(1, i + 1);
                pstmt.setString(2, lastNames[i]);
                pstmt.setString(3, firstNames[i]);
                pstmt.setString(4, emails[i]);
                pstmt.setString(5, phoneNumbers[i]);
                // Add row to the batch.
                pstmt.addBatch();
            }

            // Integer array to hold the results of inserting
            // the batch. Will contain an entry for each row,
            // indicating success or failure.
            int[] batchResults = null;

            try {
                // Batch is ready, execute it to insert the data
                batchResults = pstmt.executeBatch();
            } catch (BatchUpdateException e) {
                // We expect an exception here, since one of the
                // inserted phone numbers is too wide for its column. All of the
                // rest of the rows will be inserted.
                System.out.println("Error message: " + e.getMessage());

                // Batch results isn't set due to exception, but you
                // can get it from the exception object.
                //
                // In your own code, you shouldn't assume the a batch
                // exception occurred, since exceptions can be thrown
                // by the server for a variety of reasons.
                batchResults = e.getUpdateCounts();
            }
            // You should also be prepared to catch SQLExceptions in your own
            // application code, to handle dropped connections and other general
            // problems.

            // Commit the transaction
            conn.commit();


            // Print the array holding the results of the batch insertions.
            System.out.println("Return value from inserting batch: "
                            + Arrays.toString(batchResults));
            // Print the resulting table.
            ResultSet rs = null;
            rs = stmt.executeQuery("SELECT CustID, First_Name, "
                            + "Last_Name FROM customers ORDER BY CustID");
            while (rs.next()) {
                System.out.println(rs.getInt(1) + " - "
                                + rs.getString(2).trim() + " "
                                + rs.getString(3).trim());
            }

            // Cleanup
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

运行以上示例后,将在控制台上生成以下输出:

Error message: [Vertica][VJDBC](100172) One or more rows were rejected by the server.Return value from inserting batch: [1, 1, -3, 1, 1]
1 - Anna Allen
2 - Bill Brown
4 - Don Dodd
5 - Eric Estavez

请注意,第三行插入失败,因为其电话号码对 Phone_Number 列来说太长。该批中其余的所有行(包括该错误之后的行)已正确插入。

3 - 在服务器中回退批量加载

即使一个或多个行被拒绝,批量加载也始终插入所有数据。只有某个批中导致发生错误的行不会加载。如果数据库连接的 AutoCommit 属性为 true,各个批会在完成后自动提交其事务,因此在该批完成加载后,将提交数据。

在某些情况下,您可能希望成功插入某个批中的所有数据(在发生错误时不应提交任何数据)。若要实现此目的,最佳方法是关闭数据库连接的 AutoCommit 属性以防止各个批自动提交其自身。然后,如果某个批遇到错误,您可以在捕获由插入错误导致的 BatchUpdateException 之后回退事务。

以下示例演示了在加载某个批期间发生任何错误时执行回退。

import java.sql.*;
import java.util.Arrays;
import java.util.Properties;

public class RollbackBatchOnError {
    public static void main(String[] args) {
        Properties myProp = new Properties();
        myProp.put("user", "ExampleUser");
        myProp.put("password", "password123");
        Connection conn;
        try {
            conn = DriverManager.getConnection(
                            "jdbc:vertica://VerticaHost:5433/ExampleDB",
                            myProp);
            // Disable auto-commit. This will allow you to roll back a
            // a batch load if there is an error.
            conn.setAutoCommit(false);
            // establish connection and make a table for the data.
            Statement stmt = conn.createStatement();
            // Drop table and recreate.
            stmt.execute("DROP TABLE IF EXISTS customers CASCADE");
            stmt.execute("CREATE TABLE customers (CustID int, Last_Name"
                            + " char(50), First_Name char(50),Email char(50), "
                            + "Phone_Number char(12))");

            // Some dummy data to insert. The one row won't insert because
            // the phone number is too long for the phone column.
            String[] firstNames = new String[] { "Anna", "Bill", "Cindy",
                            "Don", "Eric" };
            String[] lastNames = new String[] { "Allen", "Brown", "Chu",
                            "Dodd", "Estavez" };
            String[] emails = new String[] { "aang@example.com",
                            "b.brown@example.com", "cindy@example.com",
                            "d.d@example.com", "e.estavez@example.com" };
            String[] phoneNumbers = new String[] { "123-456-789",
                            "555-444-3333", "555-867-53094535", "555-555-1212",
                            "781-555-0000" };
            // Create the prepared statement
            PreparedStatement pstmt = conn.prepareStatement(
                            "INSERT INTO customers (CustID, Last_Name, " +
                            "First_Name, Email, Phone_Number) "+
                            "VALUES(?,?,?,?,?)");
            // Add rows to a batch in a loop. Each iteration adds a
            // new row.
            for (int i = 0; i < firstNames.length; i++) {
                // Add each parameter to the row.
                pstmt.setInt(1, i + 1);
                pstmt.setString(2, lastNames[i]);
                pstmt.setString(3, firstNames[i]);
                pstmt.setString(4, emails[i]);
                pstmt.setString(5, phoneNumbers[i]);
                // Add row to the batch.
                pstmt.addBatch();
            }
            // Integer array to hold the results of inserting
            // the batch. Will contain an entry for each row,
            // indicating success or failure.
            int[] batchResults = null;
            try {
                // Batch is ready, execute it to insert the data
                batchResults = pstmt.executeBatch();
                // If we reach here, we inserted the batch without errors.
                // Commit it.
                System.out.println("Batch insert successful. Committing.");
                conn.commit();
            } catch (BatchUpdateException e) {
                    System.out.println("Error message: " + e.getMessage());
                    // Batch results isn't set due to exception, but you
                    // can get it from the exception object.
                    batchResults =  e.getUpdateCounts();
                    // Roll back the batch transaction.
                    System.out.println("Rolling back batch insertion");
                    conn.rollback();
            }
            catch  (SQLException e) {
                // General SQL errors, such as connection issues, throw
                // SQLExceptions. Your application should do something more
                // than just print a stack trace,
                e.printStackTrace();
            }
            System.out.println("Return value from inserting batch: "
                            + Arrays.toString(batchResults));
            System.out.println("Customers table contains:");


            // Print the resulting table.
            ResultSet rs = null;
            rs = stmt.executeQuery("SELECT CustID, First_Name, "
                            + "Last_Name FROM customers ORDER BY CustID");
            while (rs.next()) {
                System.out.println(rs.getInt(1) + " - "
                                + rs.getString(2).trim() + " "
                                + rs.getString(3).trim());
            }

            // Cleanup
            conn.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

运行以上示例后,将在系统控制台上输出以下内容:

Error message: [Vertica][VJDBC](100172) One or more rows were rejected by the server.Rolling back batch insertion
Return value from inserting batch: [1, 1, -3, 1, 1]
Customers table contains:

返回值指示是否已成功插入每个行。值 1 表示已插入该行并且未出现任何问题,而值 -3 表示插入该行失败。

客户表为空,因为已回退该批量插入,原因是第三列导致发生错误。