这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

加载数据

许多客户端应用程序的主要任务是将数据加载到 Vertica 数据库。有多种不同方法可用于通过 ODBC 插入数据,本节中的主题介绍这些方法。

1 - 使用单行插入

将数据加载到 Vertica 的最简单方法是使用 SQLExecuteDirect 函数运行 INSERT SQL 语句。但此方法仅能插入单个数据行。

ret = SQLExecDirect(hstmt, (SQLTCHAR*)"INSERT into Customers values"
      "(1,'abcda','efgh','1')", SQL_NTS);

2 - 使用预定义的语句

Vertica 支持将服务器端预定义的语句与 ODBC 和 JDBC 结合使用。使用预定义的语句,您只需定义一个语句一次,然后可以使用不同的参数多次运行该语句。要执行的语句包含占位符而非参数。执行语句时,应为每个占位符提供值。

在以下示例查询中,占位符用问号 (?) 表示:

SELECT * FROM public.inventory_fact WHERE product_key = ?

服务器端预定义的语句用于以下用途:

  • 优化查询。Vertica 只需要解析语句一次。

  • 防止 SQL 注入攻击。如果过滤用户输入时未能正确过滤嵌入在 SQL 语句中的字符串字面量转义字符,或者如果用户输入由于未强类型化而意外运行,则将发生 SQL 注入攻击。由于预定义的语句与输入数据分开进行解析,因此数据库不可能意外执行数据。

  • 将直接变量绑定到返回列。通过指向数据结构,代码不必执行额外转换。

以下示例演示了使用预定义的语句执行单一插入。

// Some standard headers
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
// Some constants for the size of the data to be inserted.
#define CUST_NAME_LEN 50
#define PHONE_NUM_LEN 15
#define NUM_ENTRIES 4
int main()
{
    // Set up the ODBC environment
    SQLRETURN ret;
    SQLHENV hdlEnv;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    // Tell ODBC that the application uses ODBC 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not set application version to ODBC3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    // Connect to the database
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not connect to database.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }

    // Disable AUTOCOMMIT
    printf("Disabling autocommit.\n");
    ret = SQLSetConnectAttr(hdlDbc, SQL_ATTR_AUTOCOMMIT, SQL_AUTOCOMMIT_OFF,
        SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not disable autocommit.\n");
        exit(EXIT_FAILURE);
    }


    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"DROP TABLE IF EXISTS customers",
        SQL_NTS);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"CREATE TABLE customers "
        "(CustID int, CustName varchar(100), Phone_Number char(15));",
        SQL_NTS);

    // Set up a bunch of variables to be bound to the statement
    // parameters.

    // Create the prepared statement. This will insert data into the
    // table we created above.
    printf("Creating prepared statement\n");
    ret = SQLPrepare (hdlStmt, (SQLTCHAR*)"INSERT INTO customers (CustID, "
        "CustName,  Phone_Number) VALUES(?,?,?)", SQL_NTS) ;
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not create prepared statement\n");
        SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
        SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
        SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
        exit(EXIT_FAILURE);
    } else {
        printf("Created prepared statement.\n");
    }
    SQLINTEGER custID = 1234;
    SQLCHAR custName[100] = "Fein, Fredrick";
    SQLVARCHAR phoneNum[15] = "555-123-6789";
    SQLLEN strFieldLen = SQL_NTS;
    SQLLEN custIDLen = 0;
    // Bind the data arrays to the parameters in the prepared SQL
    // statement
    ret = SQLBindParameter(hdlStmt, 1, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER,
        0, 0, &custID, 0 , &custIDLen);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind custID array\n");
        SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
        SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
        SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
        exit(EXIT_FAILURE);
    } else {
        printf("Bound custID to prepared statement\n");
    }
    // Bind CustNames
    SQLBindParameter(hdlStmt, 2, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,
        50, 0, (SQLPOINTER)custName,  0, &strFieldLen);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind custNames\n");
        SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
        SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
        SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
        exit(EXIT_FAILURE);
    } else {
        printf("Bound custName to prepared statement\n");
    }
    // Bind phoneNums
    SQLBindParameter(hdlStmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_CHAR,
        15, 0, (SQLPOINTER)phoneNum, 0, &strFieldLen);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind phoneNums\n");
        SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
        SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
        SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
        exit(EXIT_FAILURE);
    } else {
        printf("Bound phoneNum to prepared statement\n");
    }
    // Execute the prepared statement.
    printf("Running prepared statement...");
    ret = SQLExecute(hdlStmt);
    if(!SQL_SUCCEEDED(ret)) {
        printf("not successful!\n");
    }  else {
        printf("successful.\n");
    }

    // Done with batches, commit the transaction
    printf("Committing transaction\n");
    ret = SQLEndTran(SQL_HANDLE_DBC, hdlDbc, SQL_COMMIT);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not commit transaction\n");
    }  else {
        printf("Committed transaction\n");
    }

    // Clean up
    printf("Free handles.\n");
    ret = SQLDisconnect( hdlDbc );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error disconnecting. Transaction still open?\n");
        exit(EXIT_FAILURE);
    }
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

3 - 使用批量插入

可以使用批量插入将数据区块插入到数据库中。通过将数据拆分为多个批,您可以在加载每个批后收到有关任何拒绝的行的信息,从而监控加载进度。若要通过 ODBC 执行批量加载,通常可以将预定义的语句与绑定到数组(包含要加载的数据)结合使用。对于每个批,应将新的数据集加载到数组中,然后执行预定义的语句。

执行批量加载时,Vertica 使用 COPY 语句加载数据。所加载的其他每个批使用同一个 COPY 语句。该语句将保持打开,直至您执行以下操作为止:结束语句,关闭语句的光标或执行非 INSERT 语句。

将单个 COPY 语句用于多个批可以通过以下途径提高批量加载效率:

  • 减少插入各个批的开销

  • 将各个批合并为较大的 ROS 容器

虽然 Vertica 使用单个 COPY 语句插入一个事务中的多个批,但您可以在加载每个批之后查找由于行格式无效或存在数据类型问题而被拒绝的行(如果有)。有关详细信息,请参阅跟踪加载状态 (ODBC)

由于批量加载共享一个 COPY 语句,因此一个批中的错误会导致回退同一个事务中较早的批。

批量插入步骤

应用程序执行 ODBC 批量插入所需完成的步骤如下:

  1. 连接到数据库。

  2. 禁用连接的自动提交。

  3. 创建可插入要加载的数据的预定义的语句。

  4. 将该预定义的语句的参数绑定到包含要加载的数据的数组。

  5. 使用各个批的数据填充数组。

  6. 执行该预定义的语句。

  7. (可选)检查批量加载的结果,以查找拒绝的行。

  8. 重复以上三个步骤,直至已加载所有要加载的数据为止。

  9. 提交事务。

  10. (可选)检查整个批处理事务的结果。

以下示例代码演示了以上步骤的简化版本。

// Some standard headers
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
int main()
{
    // Number of data rows to insert
    const int NUM_ENTRIES = 4;

    // Set up the ODBC environment
    SQLRETURN ret;
    SQLHENV hdlEnv;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    // Tell ODBC that the application uses ODBC 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not set application version to ODBC3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate database handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated Database handle.\n");
    }
    // Connect to the database
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not connect to database.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }


    // Disable AUTOCOMMIT
    printf("Disabling autocommit.\n");
    ret = SQLSetConnectAttr(hdlDbc, SQL_ATTR_AUTOCOMMIT, SQL_AUTOCOMMIT_OFF,
                            SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not disable autocommit.\n");
        exit(EXIT_FAILURE);
    }

    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);

    // Create a table to hold the data
    SQLExecDirect(hdlStmt, (SQLCHAR*)"DROP TABLE IF EXISTS customers",
        SQL_NTS);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"CREATE TABLE customers "
        "(CustID int, CustName varchar(100), Phone_Number char(15));",
        SQL_NTS);

    // Create the prepared statement. This will insert data into the
    // table we created above.
    printf("Creating prepared statement\n");
    ret = SQLPrepare (hdlStmt, (SQLTCHAR*)"INSERT INTO customers (CustID, "
        "CustName,  Phone_Number) VALUES(?,?,?)", SQL_NTS) ;
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not create prepared statement\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Created prepared statement.\n");
    }
    // This is the data to be inserted into the database.
    SQLCHAR custNames[][50] = { "Allen, Anna", "Brown, Bill", "Chu, Cindy",
        "Dodd, Don" };
    SQLINTEGER custIDs[] = { 100, 101, 102, 103};
    SQLCHAR phoneNums[][15] = {"1-617-555-1234", "1-781-555-1212",
        "1-508-555-4321", "1-617-555-4444"};
    // Bind the data arrays to the parameters in the prepared SQL
    // statement. First is the custID.
    ret = SQLBindParameter(hdlStmt, 1, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER,
        0, 0, (SQLPOINTER)custIDs, sizeof(SQLINTEGER) , NULL);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind custID array\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Bound CustIDs array to prepared statement\n");
    }
    // Bind CustNames
    ret = SQLBindParameter(hdlStmt, 2, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,
        50, 0, (SQLPOINTER)custNames, 50, NULL);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind custNames\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Bound CustNames array to prepared statement\n");
    }
    // Bind phoneNums
    ret = SQLBindParameter(hdlStmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_CHAR,
        15, 0, (SQLPOINTER)phoneNums, 15, NULL);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind phoneNums\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Bound phoneNums array to prepared statement\n");
    }
    // Tell the ODBC driver how many rows we have in the
    // array.
    ret = SQLSetStmtAttr( hdlStmt, SQL_ATTR_PARAMSET_SIZE,
        (SQLPOINTER)NUM_ENTRIES, 0 );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind set parameter size\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Bound phoneNums array to prepared statement\n");
    }

    // Add multiple batches to the database. This just adds the same
    // batch of data four times for simplicity's sake. Each call adds
    // the 4 rows into the database.
    for (int batchLoop=1; batchLoop<=5; batchLoop++) {
        // Execute the prepared statement, loading all of the data
        // in the arrays.
        printf("Adding Batch #%d...", batchLoop);
        ret = SQLExecute(hdlStmt);
        if(!SQL_SUCCEEDED(ret)) {
           printf("not successful!\n");
        }  else {
            printf("successful.\n");
        }
    }
    // Done with batches, commit the transaction
    printf("Committing transaction\n");
    ret = SQLEndTran(SQL_HANDLE_DBC, hdlDbc, SQL_COMMIT);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not commit transaction\n");
    }  else {
        printf("Committed transaction\n");
    }

    // Clean up
    printf("Free handles.\n");
    ret = SQLDisconnect( hdlDbc );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error disconnecting. Transaction still open?\n");
        exit(EXIT_FAILURE);
    }
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

运行以上代码后的结果如下所示。

Allocated an environment handle.
Set application to ODBC 3.
Allocated Database handle.
Connecting to database.
Connected to database.
Creating prepared statement
Created prepared statement.
Bound CustIDs array to prepared statement
Bound CustNames array to prepared statement
Bound phoneNums array to prepared statement
Adding Batch #1...successful.
Adding Batch #2...successful.
Adding Batch #3...successful.
Adding Batch #4...successful.
Adding Batch #5...successful.
Committing transaction
Committed transaction
Free handles.

结果表如下所示:

=> SELECT * FROM customers;
 CustID |  CustName   |  Phone_Number
--------+-------------+-----------------
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
(20 rows)

3.1 - 跟踪加载状态 (ODBC)

加载一批数据之后,客户端应用程序可以获取已处理的行数并确定每个是被接受还是被拒绝。

确定接受的行的数量

若要获取某个批所处理的行数,请将名为 SQL_ATTR_PARAMS_PROCESSED_PTR 且指向用于接收行数的变量的属性添加到语句对象中:

    SQLULEN rowsProcessed;
    SQLSetStmtAttr(hdlStmt, SQL_ATTR_PARAMS_PROCESSED_PTR, &rowsProcessed, 0);

当应用程序调用 SQLExecute() 以插入该批时,Vertica ODBC 驱动程序会将已处理的行数(不一定等于已成功插入的行数)保存到您在 SQL_ATTR_PARAMS_PROCESSED_PTR 语句属性中指定的变量。

查找接受的行和拒绝的行

应用程序还可以设置名为 SQL_ATTR_PARAM_STATUS_PTR 且指向一个数组(ODBC 驱动程序可以将插入每个行的结果存储在此数组中)的语句属性:

    SQLUSMALLINT   rowResults[ NUM_ENTRIES ];
    SQLSetStmtAttr(hdlStmt, SQL_ATTR_PARAM_STATUS_PTR, rowResults, 0);

该数组至少必须与每个批中要插入的行数一样大。

当应用程序调用 SQLExecute 以插入一个批时,ODBC 驱动程序会使用值填充该数组,这些值指示每个行已成功插入(SQL_PARAM_SUCCESS 或 SQL_PARAM_SUCCESS_WITH_INFO)还是遇到了错误 (SQL_PARAM_ERROR)。

以下示例扩展了使用批量插入中所示的示例,以便同时报告已处理的行数和插入的每个行的状态。

// Some standard headers
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
// Helper function to print SQL error messages.
template <typename HandleT>
void reportError(int handleTypeEnum, HandleT hdl)
{
    // Get the status records.
    SQLSMALLINT   i, MsgLen;
    SQLRETURN ret2;
    SQLCHAR       SqlState[6], Msg[SQL_MAX_MESSAGE_LENGTH];
    SQLINTEGER    NativeError;
    i = 1;
    printf("\n");
    while ((ret2 = SQLGetDiagRec(handleTypeEnum, hdl, i, SqlState, &NativeError,
        Msg, sizeof(Msg), &MsgLen)) != SQL_NO_DATA) {
            printf("error record %d\n", i);
            printf("sqlstate: %s\n", SqlState);
            printf("detailed msg: %s\n", Msg);
            printf("native error code: %d\n\n", NativeError);
            i++;
    }
}
int main()
{
    // Number of data rows to insert
    const int NUM_ENTRIES = 4;


    SQLRETURN ret;
    SQLHENV hdlEnv;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not set application version to ODBC3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate database handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated Database handle.\n");
    }
    // Connect to the database
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not connect to database.\n");
        reportError<SQLHDBC>(SQL_HANDLE_DBC, hdlDbc);
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }
    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"DROP TABLE IF EXISTS customers",
        SQL_NTS);
    // Create a table into which we can store data
    printf("Creating table.\n");
    ret = SQLExecDirect(hdlStmt, (SQLCHAR*)"CREATE TABLE customers "
        "(CustID int, CustName varchar(50), Phone_Number char(15));",
        SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
        exit(EXIT_FAILURE);
    } else {
        printf("Created table.\n");
    }
    // Create the prepared statement. This will insert data into the
    // table we created above.
    printf("Creating prepared statement\n");
    ret = SQLPrepare (hdlStmt, (SQLTCHAR*)"INSERT INTO customers (CustID, "
        "CustName,  Phone_Number) VALUES(?,?,?)", SQL_NTS) ;
    if(!SQL_SUCCEEDED(ret)) {
        reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
        exit(EXIT_FAILURE);
    } else {
        printf("Created prepared statement.\n");
    }
    // This is the data to be inserted into the database.
    char custNames[][50] = { "Allen, Anna", "Brown, Bill", "Chu, Cindy",
        "Dodd, Don" };
    SQLINTEGER custIDs[] = { 100, 101, 102, 103};
    char phoneNums[][15] = {"1-617-555-1234", "1-781-555-1212",
        "1-508-555-4321", "1-617-555-4444"};
    // Bind the data arrays to the parameters in the prepared SQL
    // statement
    ret = SQLBindParameter(hdlStmt, 1, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER,
        0, 0, (SQLPOINTER)custIDs, sizeof(SQLINTEGER) , NULL);
    if(!SQL_SUCCEEDED(ret)) {
        reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
        exit(EXIT_FAILURE);
    } else {
        printf("Bound CustIDs array to prepared statement\n");
    }
    // Bind CustNames
    SQLBindParameter(hdlStmt, 2, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,
        50, 0, (SQLPOINTER)custNames, 50, NULL);
    if(!SQL_SUCCEEDED(ret)) {
        reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
        exit(EXIT_FAILURE);
    } else {
        printf("Bound CustNames array to prepared statement\n");
    }
    // Bind phoneNums
    SQLBindParameter(hdlStmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_CHAR,
        15, 0, (SQLPOINTER)phoneNums, 15, NULL);
    if(!SQL_SUCCEEDED(ret)) {
        reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
        exit(EXIT_FAILURE);
    } else {
        printf("Bound phoneNums array to prepared statement\n");
    }
    // Set up a variable to recieve number of parameters processed.
    SQLULEN rowsProcessed;
    // Set a statement attribute to point to the variable
    SQLSetStmtAttr(hdlStmt, SQL_ATTR_PARAMS_PROCESSED_PTR, &rowsProcessed, 0);
    // Set up an array to hold the result of each row insert
    SQLUSMALLINT   rowResults[ NUM_ENTRIES ];
    // Set a statement attribute to point to the array
    SQLSetStmtAttr(hdlStmt, SQL_ATTR_PARAM_STATUS_PTR, rowResults, 0);
    // Tell the ODBC driver how many rows we have in the
    // array.
    SQLSetStmtAttr(hdlStmt, SQL_ATTR_PARAMSET_SIZE, (SQLPOINTER)NUM_ENTRIES, 0);
    // Add multiple batches to the database. This just adds the same
    // batch of data over and over again for simplicity's sake.
    for (int batchLoop=1; batchLoop<=5; batchLoop++) {
        // Execute the prepared statement, loading all of the data
        // in the arrays.
        printf("Adding Batch #%d...", batchLoop);
        ret = SQLExecute(hdlStmt);
        if(!SQL_SUCCEEDED(ret)) {
            reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
            exit(EXIT_FAILURE);
        }
        // Number of rows processed is in rowsProcessed
        printf("Params processed: %d\n", rowsProcessed);
        printf("Results of inserting each row:\n");
        int i;
        for (i = 0; i<NUM_ENTRIES; i++) {
            SQLUSMALLINT result = rowResults[i];
            switch(rowResults[i]) {
                case SQL_PARAM_SUCCESS:
                case SQL_PARAM_SUCCESS_WITH_INFO:
                    printf("  Row %d inserted successsfully\n", i+1);
                    break;
                case SQL_PARAM_ERROR:
                    printf("  Row %d was not inserted due to an error.", i+1);
                    break;
                default:
                    printf("  Row %d had some issue with it: %d\n", i+1, result);
            }
        }
    }
    // Done with batches, commit the transaction
    printf("Commit Transaction\n");
    ret = SQLEndTran(SQL_HANDLE_DBC, hdlDbc, SQL_COMMIT);
    if(!SQL_SUCCEEDED(ret)) {
        reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
    }


    // Clean up
    printf("Free handles.\n");
    ret = SQLDisconnect( hdlDbc );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error disconnecting. Transaction still open?\n");
        exit(EXIT_FAILURE);
    }
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

运行以上示例代码后,将生成以下输出:

Allocated an environment handle.Set application to ODBC 3.
Allocated Database handle.
Connecting to database.
Connected to database.
Creating table.
Created table.
Creating prepared statement
Created prepared statement.
Bound CustIDs array to prepared statement
Bound CustNames array to prepared statement
Bound phoneNums array to prepared statement
Adding Batch #1...Params processed: 4
Results of inserting each row:
  Row 1 inserted successfully
  Row 2 inserted successfully
  Row 3 inserted successfully
  Row 4 inserted successfully
Adding Batch #2...Params processed: 4
Results of inserting each row:
  Row 1 inserted successfully
  Row 2 inserted successfully
  Row 3 inserted successfully
  Row 4 inserted successfully
Adding Batch #3...Params processed: 4
Results of inserting each row:
  Row 1 inserted successfully
  Row 2 inserted successfully
  Row 3 inserted successfully
  Row 4 inserted successfully
Adding Batch #4...Params processed: 4
Results of inserting each row:
  Row 1 inserted successfully
  Row 2 inserted successfully
  Row 3 inserted successfully
  Row 4 inserted successfully
Adding Batch #5...Params processed: 4
Results of inserting each row:
  Row 1 inserted successfully
  Row 2 inserted successfully
  Row 3 inserted successfully
  Row 4 inserted successfully
Commit Transaction
Free handles.

3.2 - 批量加载过程中的错误处理

加载各个批时,您可以查找有关已接受的行数和已拒绝的行的信息(有关详细信息,请参阅跟踪加载状态 (ODBC))。插入各个批时,不会发生其他错误(例如磁盘空间错误)。此行为是由单个 COPY 语句对多个连续批执行加载导致的。使用单个 COPY 语句可使批量加载过程更快执行。只有 COPY 语句关闭时,才会提交批量数据,而且 Vertica 会报告其他类型错误。

批量加载应用程序应在 COPY 语句关闭时检查错误。一般情况下,可以通过调用 SQLEndTran() 函数以结束事务来强制 COPY 语句关闭。您还可以通过以下方法强制 COPY 语句关闭:使用 SQLCloseCursor() 函数关闭光标,或者在加载中插入最后一批之前将数据库连接的 AutoCommit 属性设置为 true。

4 - 使用 COPY 语句

COPY 允许您将存储在数据库节点上的文件中的数据批量加载到 Vertica 数据库。此方法是将数据加载到 Vertica 的最高效方法,因为文件驻留在数据库服务器上。您必须是超级用户才能使用 COPY 访问数据库节点的文件系统。

以下示例演示了如何使用 COPY 命令。

// Some standard headers
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
// Helper function to determine if an ODBC function call returned
// successfully.
bool notSuccess(SQLRETURN ret) {
    return (ret != SQL_SUCCESS && ret != SQL_SUCCESS_WITH_INFO);
}
int main()
{
    // Set up the ODBC environment
    SQLRETURN ret;
    SQLHENV hdlEnv;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(notSuccess(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    // Tell ODBC that the application uses ODBC 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(notSuccess(ret)) {
        printf("Could not set application version to ODBC3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    // Connect to the database
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";

    // Note: User MUST be a database superuser to be able to access files on the
    // filesystem of the node.
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(notSuccess(ret)) {
        printf("Could not connect to database.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }

    // Disable AUTOCOMMIT
    printf("Disabling autocommit.\n");
    ret = SQLSetConnectAttr(hdlDbc, SQL_ATTR_AUTOCOMMIT, SQL_AUTOCOMMIT_OFF, SQL_NTS);
    if(notSuccess(ret)) {
        printf("Could not disable autocommit.\n");
        exit(EXIT_FAILURE);
    }

    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);
    // Create table to hold the data
    SQLExecDirect(hdlStmt, (SQLCHAR*)"DROP TABLE IF EXISTS customers",
        SQL_NTS);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"CREATE TABLE customers"
        "(Last_Name char(50) NOT NULL, First_Name char(50),Email char(50), "
        "Phone_Number char(15));",
        SQL_NTS);

    // Run the copy command to load data.
    ret=SQLExecDirect(hdlStmt, (SQLCHAR*)"COPY customers "
        "FROM '/data/customers.txt'",
        SQL_NTS);
    if(notSuccess(ret)) {
        printf("Data was not successfully loaded.\n");
        exit(EXIT_FAILURE);
    } else {
        // Get number of rows added.
        SQLLEN numRows;
        ret=SQLRowCount(hdlStmt, &numRows);
        printf("Successfully inserted %d rows.\n", numRows);

    }

    // Done with batches, commit the transaction
    printf("Committing transaction\n");
    ret = SQLEndTran(SQL_HANDLE_DBC, hdlDbc, SQL_COMMIT);
    if(notSuccess(ret)) {
        printf("Could not commit transaction\n");
    }  else {
        printf("Committed transaction\n");
    }

    // Clean up
    printf("Free handles.\n");
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

该示例在运行后输出了以下内容:

Allocated an environment handle.
Set application to ODBC 3.
Connecting to database.
Connected to database.
Disabling autocommit.
Successfully inserted 10001 rows.
Committing transaction
Committed transaction
Free handles.

5 - 使用 COPY LOCAL 从客户端进行流式数据传输

COPY LOCAL 将数据从客户端系统文件流式传输到 Vertica 数据库。此语句通过 ODBC 驱动程序进行工作,从而简化了将数据文件从客户端传输到服务器的任务。

COPY LOCAL 通过 ODBC 驱动程序以透明方式工作。当客户端应用程序执行 COPY LOCAL 语句时,ODBC 驱动程序将从客户端读取数据文件并将其流式传输到服务器。

此示例演示如何使用 COPY LOCAL 语句从客户端系统加载数据:

// Some standard headers
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
int main()
{
    // Set up the ODBC environment
    SQLRETURN ret;
    SQLHENV hdlEnv;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    // Tell ODBC that the application uses ODBC 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not set application version to ODBC3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not aalocate a database handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    // Connect to the database
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not connect to database.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }

    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);


    // Create table to hold the data
    SQLExecDirect(hdlStmt, (SQLCHAR*)"DROP TABLE IF EXISTS customers",
        SQL_NTS);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"CREATE TABLE customers"
        "(Last_Name char(50) NOT NULL, First_Name char(50),Email char(50), "
        "Phone_Number char(15));",
        SQL_NTS);

    // Run the copy command to load data.
    ret=SQLExecDirect(hdlStmt, (SQLCHAR*)"COPY customers "
        "FROM LOCAL '/home/dbadmin/customers.txt'",
        SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Data was not successfully loaded.\n");
        exit(EXIT_FAILURE);
    } else {
        // Get number of rows added.
        SQLLEN numRows;
        ret=SQLRowCount(hdlStmt, &numRows);
        printf("Successfully inserted %d rows.\n", numRows);
    }

    // COPY commits automatically, unless it is told not to, so
    // there is no need to commit the transaction.

    // Clean up
    printf("Free handles.\n");
    ret = SQLDisconnect( hdlDbc );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error disconnecting. Transaction still open?\n");
        exit(EXIT_FAILURE);
    }
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

除了使用 COPY 语句的 LOCAL 选项从客户端系统而非数据库节点的文件系统加载数据之外,此示例与使用 COPY 语句中所示的示例基本相同。