使用批量插入

可以使用批量插入将数据区块插入到数据库中。通过将数据拆分为多个批,您可以在加载每个批后收到有关任何拒绝的行的信息,从而监控加载进度。若要通过 ODBC 执行批量加载,通常可以将预定义的语句与绑定到数组(包含要加载的数据)结合使用。对于每个批,应将新的数据集加载到数组中,然后执行预定义的语句。

执行批量加载时,Vertica 使用 COPY 语句加载数据。所加载的其他每个批使用同一个 COPY 语句。该语句将保持打开,直至您执行以下操作为止:结束语句,关闭语句的光标或执行非 INSERT 语句。

将单个 COPY 语句用于多个批可以通过以下途径提高批量加载效率:

  • 减少插入各个批的开销

  • 将各个批合并为较大的 ROS 容器

虽然 Vertica 使用单个 COPY 语句插入一个事务中的多个批,但您可以在加载每个批之后查找由于行格式无效或存在数据类型问题而被拒绝的行(如果有)。有关详细信息,请参阅跟踪加载状态 (ODBC)

由于批量加载共享一个 COPY 语句,因此一个批中的错误会导致回退同一个事务中较早的批。

批量插入步骤

应用程序执行 ODBC 批量插入所需完成的步骤如下:

  1. 连接到数据库。

  2. 禁用连接的自动提交。

  3. 创建可插入要加载的数据的预定义的语句。

  4. 将该预定义的语句的参数绑定到包含要加载的数据的数组。

  5. 使用各个批的数据填充数组。

  6. 执行该预定义的语句。

  7. (可选)检查批量加载的结果,以查找拒绝的行。

  8. 重复以上三个步骤,直至已加载所有要加载的数据为止。

  9. 提交事务。

  10. (可选)检查整个批处理事务的结果。

以下示例代码演示了以上步骤的简化版本。

// Some standard headers
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
int main()
{
    // Number of data rows to insert
    const int NUM_ENTRIES = 4;

    // Set up the ODBC environment
    SQLRETURN ret;
    SQLHENV hdlEnv;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    // Tell ODBC that the application uses ODBC 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not set application version to ODBC3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate database handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated Database handle.\n");
    }
    // Connect to the database
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not connect to database.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }


    // Disable AUTOCOMMIT
    printf("Disabling autocommit.\n");
    ret = SQLSetConnectAttr(hdlDbc, SQL_ATTR_AUTOCOMMIT, SQL_AUTOCOMMIT_OFF,
                            SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not disable autocommit.\n");
        exit(EXIT_FAILURE);
    }

    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);

    // Create a table to hold the data
    SQLExecDirect(hdlStmt, (SQLCHAR*)"DROP TABLE IF EXISTS customers",
        SQL_NTS);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"CREATE TABLE customers "
        "(CustID int, CustName varchar(100), Phone_Number char(15));",
        SQL_NTS);

    // Create the prepared statement. This will insert data into the
    // table we created above.
    printf("Creating prepared statement\n");
    ret = SQLPrepare (hdlStmt, (SQLTCHAR*)"INSERT INTO customers (CustID, "
        "CustName,  Phone_Number) VALUES(?,?,?)", SQL_NTS) ;
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not create prepared statement\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Created prepared statement.\n");
    }
    // This is the data to be inserted into the database.
    SQLCHAR custNames[][50] = { "Allen, Anna", "Brown, Bill", "Chu, Cindy",
        "Dodd, Don" };
    SQLINTEGER custIDs[] = { 100, 101, 102, 103};
    SQLCHAR phoneNums[][15] = {"1-617-555-1234", "1-781-555-1212",
        "1-508-555-4321", "1-617-555-4444"};
    // Bind the data arrays to the parameters in the prepared SQL
    // statement. First is the custID.
    ret = SQLBindParameter(hdlStmt, 1, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER,
        0, 0, (SQLPOINTER)custIDs, sizeof(SQLINTEGER) , NULL);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind custID array\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Bound CustIDs array to prepared statement\n");
    }
    // Bind CustNames
    ret = SQLBindParameter(hdlStmt, 2, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,
        50, 0, (SQLPOINTER)custNames, 50, NULL);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind custNames\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Bound CustNames array to prepared statement\n");
    }
    // Bind phoneNums
    ret = SQLBindParameter(hdlStmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_CHAR,
        15, 0, (SQLPOINTER)phoneNums, 15, NULL);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind phoneNums\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Bound phoneNums array to prepared statement\n");
    }
    // Tell the ODBC driver how many rows we have in the
    // array.
    ret = SQLSetStmtAttr( hdlStmt, SQL_ATTR_PARAMSET_SIZE,
        (SQLPOINTER)NUM_ENTRIES, 0 );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind set parameter size\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Bound phoneNums array to prepared statement\n");
    }

    // Add multiple batches to the database. This just adds the same
    // batch of data four times for simplicity's sake. Each call adds
    // the 4 rows into the database.
    for (int batchLoop=1; batchLoop<=5; batchLoop++) {
        // Execute the prepared statement, loading all of the data
        // in the arrays.
        printf("Adding Batch #%d...", batchLoop);
        ret = SQLExecute(hdlStmt);
        if(!SQL_SUCCEEDED(ret)) {
           printf("not successful!\n");
        }  else {
            printf("successful.\n");
        }
    }
    // Done with batches, commit the transaction
    printf("Committing transaction\n");
    ret = SQLEndTran(SQL_HANDLE_DBC, hdlDbc, SQL_COMMIT);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not commit transaction\n");
    }  else {
        printf("Committed transaction\n");
    }

    // Clean up
    printf("Free handles.\n");
    ret = SQLDisconnect( hdlDbc );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error disconnecting. Transaction still open?\n");
        exit(EXIT_FAILURE);
    }
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

运行以上代码后的结果如下所示。

Allocated an environment handle.
Set application to ODBC 3.
Allocated Database handle.
Connecting to database.
Connected to database.
Creating prepared statement
Created prepared statement.
Bound CustIDs array to prepared statement
Bound CustNames array to prepared statement
Bound phoneNums array to prepared statement
Adding Batch #1...successful.
Adding Batch #2...successful.
Adding Batch #3...successful.
Adding Batch #4...successful.
Adding Batch #5...successful.
Committing transaction
Committed transaction
Free handles.

结果表如下所示:

=> SELECT * FROM customers;
 CustID |  CustName   |  Phone_Number
--------+-------------+-----------------
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
(20 rows)