使用 VerticaCopyStream

使用 VerticaCopyStream 类可以将数据从客户端系统流式传输到 Vertica 数据库。它允许您直接使用 COPY,而无需先将数据复制到数据库群集中的主机。使用 COPY 命令从主机加载数据时,需要拥有超级用户权限才能访问主机的文件系统。用于从流加载数据的 COPY 语句不需要超级用户权限,因此客户端可以使用对目标表拥有 INSERT 权限的任何用户帐户进行连接。

若要将流复制到数据库,请执行下列操作:

  1. 禁用数据库连接的 AutoCommit 连接参数。

  2. VerticaCopyStreamObject 实例化,并至少向其传递数据库连接对象和包含用于加载数据的 COPY 语句的字符串。该语句必须将 STDIN 中的数据复制到表。您可以使用适用于数据加载的任何参数。

  3. 调用 VerticaCopyStreamObject.start(),以启动 COPY 语句并开始以流式传输已添加到 VerticaCopyStreamObject 的任何流中的数据。

  4. 调用 VerticaCopyStreamObject.addStream(),以将其他流添加到要发送到数据库的流的列表。然后,可以调用 VerticaCopyStreamObject.execute() 以将它们以流式传输到服务器。

  5. (可选)调用 VerticaCopyStreamObject.getRejects() 以从上一次 .execute() 调用获取拒绝的行的列表。对 .execute().finish() 的每次调用会重置拒绝列表。

  6. 完成添加流后,调用 VerticaCopyStreamObject.finish() 以将其余任何流发送到数据库并关闭 COPY 语句。

  7. 调用 Connection.commit() 以提交已加载的数据。

获取拒绝的行

VerticaCopyStreamObject.getRejects() 方法将返回一个列表,其中包含上一次 .execute() 方法调用之后拒绝的行的数量。对 .execute() 的每次调用会清除拒绝的行的列表,因此您需要在每次调用 .execute() 之后调用 .getRejects()。由于 .start().finish() 也会调用 .execute() 以将任何挂起的流发送到服务器,因此您还应在调用这些方法之后调用 .getRejects()

以下示例演示了将存储在客户端系统上的五个文本文件的内容加载到表中。

import java.io.File;
import java.io.FileInputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import com.vertica.jdbc.VerticaConnection;
import com.vertica.jdbc.VerticaCopyStream;

public class CopyMultipleStreamsExample {
    public static void main(String[] args) {
        // Note: If running on Java 5, you need to call Class.forName
        // to manually load the JDBC driver.
        // Set up the properties of the connection
        Properties myProp = new Properties();
        myProp.put("user", "ExampleUser"); // Must be superuser
        myProp.put("password", "password123");
        // When performing bulk loads, you should always disable the
        // connection's AutoCommit property to ensure the loads happen as
        // efficiently as possible by reusing the same COPY command and
        // transaction.
        myProp.put("AutoCommit", "false");
        Connection conn;
        try {
            conn = DriverManager.getConnection(
                          "jdbc:vertica://VerticaHost:5433/ExampleDB", myProp);
            Statement stmt = conn.createStatement();

            // Create a table to receive the data
            stmt.execute("DROP TABLE IF EXISTS customers");
            stmt.execute("CREATE TABLE customers (Last_Name char(50), "
                            + "First_Name char(50),Email char(50), "
                            + "Phone_Number char(15))");

            // Prepare the query to insert from a stream. This query must use
            // the COPY statement to load data from STDIN. Unlike copying from
            // a file on the host, you do not need superuser privileges to
            // copy a stream. All your user account needs is INSERT privileges
            // on the target table.
            String copyQuery = "COPY customers FROM STDIN "
                            + "DELIMITER '|' ENFORCELENGTH";

            // Create an instance of the stream class. Pass in the
            // connection and the query string.
            VerticaCopyStream stream = new VerticaCopyStream(
                            (VerticaConnection) conn, copyQuery);

            // Keep running count of the number of rejects
            int totalRejects = 0;

            // start() starts the stream process, and opens the COPY command.
            stream.start();

            // If you added streams to VerticaCopyStream before calling start(),
            // You should check for rejects here (see below). The start() method
            // calls execute() to send any pre-queued streams to the server
            // once the COPY statement has been created.

            // Simple for loop to load 5 text files named customers-1.txt to
            // customers-5.txt
            for (int loadNum = 1; loadNum <= 5; loadNum++) {
                // Prepare the input file stream. Read from a local file.
                String filename = "C:\\Data\\customers-" + loadNum + ".txt";
                System.out.println("\n\nLoading file: " + filename);
                File inputFile = new File(filename);
                FileInputStream inputStream = new FileInputStream(inputFile);

                // Add stream to the VerticaCopyStream
                stream.addStream(inputStream);

                // call execute() to load the newly added stream. You could
                // add many streams and call execute once to load them all.
                // Which method you choose depends mainly on whether you want
                // the ability to check the number of rejections as the load
                // progresses so you can stop if the number of rejects gets too
                // high. Also, high numbers of InputStreams could create a
                // resource issue on your client system.
                stream.execute();

                // Show any rejects from this execution of the stream load
                // getRejects() returns a List containing the
                // row numbers of rejected rows.
                List<Long> rejects = stream.getRejects();

                // The size of the list gives you the number of rejected rows.
                int numRejects = rejects.size();
                totalRejects += numRejects;
                System.out.println("Number of rows rejected in load #"
                                + loadNum + ": " + numRejects);

                // List all of the rows that were rejected.
                Iterator<Long> rejit = rejects.iterator();
                long linecount = 0;
                while (rejit.hasNext()) {
                    System.out.print("Rejected row #" + ++linecount);
                    System.out.println(" is row " + rejit.next());
                }
            }
            // Finish closes the COPY command. It returns the number of
            // rows inserted.
            long results = stream.finish();
            System.out.println("Finish returned " + results);

            // If you added any streams that hadn't been executed(),
            // you should also check for rejects here, since finish()
            // calls execute() to

            // You can also get the number of rows inserted using
            // getRowCount().
            System.out.println("Number of rows accepted: "
                            + stream.getRowCount());
            System.out.println("Total number of rows rejected: " + totalRejects);

            // Commit the loaded data
            conn.commit();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

对部分示例数据运行以上示例后,将生成以下输出:


Loading file: C:\Data\customers-1.txtNumber of rows rejected in load #1: 3
Rejected row #1 is row 3
Rejected row #2 is row 7
Rejected row #3 is row 51
Loading file: C:\Data\customers-2.txt
Number of rows rejected in load #2: 5Rejected row #1 is row 4143
Rejected row #2 is row 6132
Rejected row #3 is row 9998
Rejected row #4 is row 10000
Rejected row #5 is row 10050
Loading file: C:\Data\customers-3.txt
Number of rows rejected in load #3: 9
Rejected row #1 is row 14142
Rejected row #2 is row 16131
Rejected row #3 is row 19999
Rejected row #4 is row 20001
Rejected row #5 is row 20005
Rejected row #6 is row 20049
Rejected row #7 is row 20056
Rejected row #8 is row 20144
Rejected row #9 is row 20236
Loading file: C:\Data\customers-4.txt
Number of rows rejected in load #4: 8
Rejected row #1 is row 23774
Rejected row #2 is row 24141
Rejected row #3 is row 25906
Rejected row #4 is row 26130
Rejected row #5 is row 27317
Rejected row #6 is row 28121
Rejected row #7 is row 29321
Rejected row #8 is row 29998
Loading file: C:\Data\customers-5.txt
Number of rows rejected in load #5: 1
Rejected row #1 is row 39997
Finish returned 39995
Number of rows accepted: 39995
Total number of rows rejected: 26