这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

C/C++

Vertica 提供开放式数据库连接 (ODBC) 驱动程序,此驱动程序可使应用程序能够连接到 Vertica 数据库。此驱动程序可供自定义编写的客户端应用程序(使用 ODBC API 与 Vertica 交互)使用。ODBC 还可由许多第三方应用程序(包括商业智能应用程序以及提取、转换和加载 (Extract, Transform, and Load, ETL) 应用程序)用来连接到 Vertica。

此部分详细介绍配置 Vertica ODBC 驱动程序的过程。此部分还介绍如何在自己的客户端应用程序中使用 ODBC API 连接到 Vertica。

尽管使用 C、C++、PerlPHP 等编写的客户端应用程序均使用 ODBC 客户端驱动程序连接到 Vertica,但此部分仅阐述 C 和 C++ 应用程序。

1 - ODBC 体系结构

ODBC 架构包含四个层:

  • 客户端应用程序

    此层是一个应用程序,用于通过数据源名称 (DSN) 打开数据源。然后,此应用程序会将请求发送到数据源,并接收这些请求的结果。请求以调用 ODBC 函数的方式发出。

  • 驱动程序管理器

    此层是客户端系统上的一个库,用作客户端应用程序和一个或多个驱动程序之间的中介。该驱动程序管理器执行下列操作:

    • 解析由客户端应用程序提供的 DSN。

    • 加载访问 DSN 中定义的特定数据库所需的驱动程序。

    • 处理来自客户端的 ODBC 函数调用,或将这些调用传递到驱动程序。

    • 从驱动程序检索结果。

    • 在不再需要时卸载驱动程序。

    在 Windows 和 macOS 客户端系统上,驱动程序管理器由操作系统提供。在 Linux 系统上,您通常需要安装驱动程序管理器。有关可在客户端平台上与 Vertica 结合使用的驱动程序管理器的列表,请参阅客户端驱动程序支持

  • Driver

    此层是客户端系统上的一个库,提供对特定数据库的访问。此层可将请求转换为数据库所需的格式,还可将结果重新转换为客户端应用程序所需的格式。

  • 数据库

    数据库可处理在客户端应用程序上启动的请求并返回结果。

2 - ODBC 功能支持

适用于 Vertica 的 ODBC 驱动程序支持 Microsoft ODBC 3.5 规范中定义的大部分功能。以下功能不受支持:

  • 可更新的结果集

  • 向后滚动光标

  • 光标属性

  • 每个连接有多个打开的语句。同时执行的语句必须各自属于不同的连接。例如,当其他语句的结果集处于打开状态时,您无法执行新语句。要使用相同的连接/会话执行另一条语句,请等待当前语句完成执行并关闭其结果集,然后执行新语句。

  • 键集

  • 书签

Vertica ODBC 驱动程序会准确报告其功能。如果需要确定驱动程序是否符合特定功能,您应使用 SQLGetInfo() 函数直接查询驱动程序的功能。

3 - Vertica 和 ODBC 数据类型转换

大多数数据类型均可在 Vertica 和 ODBC 之间透明地进行转换。此部分介绍了几种需要特殊处理的数据类型。

注意

  • GEOMETRY 和 GEOGRAPHY 数据类型被 ODBC 驱动程序视为 LONG VARCHAR 数据。

  • Vertica 支持 ODBC 所支持的标准间隔数据类型。请参阅 Microsoft 的 ODBC 参考文档中的间隔数据类型

  • Vertica 9.0.0 版引入了 UUID 数据类型,包括对 UUID 的 JDBC 支持。Vertica ADO.NET、ODBC 和 OLE DB 客户端在 9.0.1 版中添加了对 UUID 的完全支持。Vertica 保持与不支持 UUID 数据类型的旧 受支持 客户端驱动程序版本的向后兼容性,如下所示:

另请参阅

4 - ODBC 头文件

Vertica ODBC 驱动程序提供了名为 verticaodbc.h 的 C 头文件,该头文件定义了几个可在应用程序中使用的有用常数。使用这些常数,您可以访问和更改特定于 Vertica 的设置。

此文件的位置取决于客户端操作系统:

  • Linux和UNIX系统:/opt/vertica/include
  • Windows系统:C:\Program Files (x86)\Vertica\ODBC\include

下面列出了此文件中定义的常数。

5 - 连接到数据库

在任何 ODBC 应用程序中,第一步总是连接到数据库。创建与使用 ODBC 的数据源的连接时,应使用 DSN(其中包含要使用的驱动程序的详细信息、数据库主机和有关连接到数据源的其他基本信息)的名称。

若要连接到数据库,您需要对应用程序执行以下 4 个步骤:

  1. 调用 SQLAllocHandle() 来为 ODBC 环境分配句柄。此句柄用于创建连接对象和设置应用程序范围设置。

  2. 使用环境句柄设置应用程序要使用的 ODBC 版本。这样可确保数据源知道应用程序将使用哪个 API 与其交互。

  3. 通过调用 SQLAllocHandle() 来分配数据库连接句柄。此句柄代表与特定数据源的连接。

  4. 使用 SQLConnect()SQLDriverConnect() 函数打开与数据库的连接。

创建与数据库的连接时,如果在连接时只需要设置用户名和密码选项,请使用 SQLConnect()。如果要更改诸如区域设置等选项,请使用 SQLDriverConnect()

以下示例演示了使用名为 ExampleDB 的 DSN 连接到数据库。成功创建连接后,此示例仅关闭该连接。

// Demonstrate connecting to Vertica using ODBC.
// Standard i/o library
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// SQL include files that define data types and ODBC API
// functions
#include <sql.h>
#include <sqlext.h>
#include <sqltypes.h>
int main()
{
    SQLRETURN ret;   // Stores return value from ODBC API calls
    SQLHENV hdlEnv;  // Handle for the SQL environment object
    // Allocate an a SQL environment object
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }

    // Set the ODBC version we are going to use to
    // 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
            (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(!SQL_SUCCEEDED(ret)) {
         printf("Could not set application version to ODBC 3.\n");
         exit(EXIT_FAILURE);
    } else {
         printf("Set application version to ODBC 3.\n");
    }
    // Allocate a database handle.
    SQLHDBC hdlDbc;
     ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
     if(!SQL_SUCCEEDED(ret)) {
          printf("Could not allocate database handle.\n");
          exit(EXIT_FAILURE);
     } else {
          printf("Allocated Database handle.\n");
     }
    // Connect to the database using
    // SQL Connect
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";
    const char* userID = "ExampleUser";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not connect to database.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }
    // We're connected. You can do real
    // work here

    // When done, free all of the handles to close them
    // in an orderly fashion.
    printf("Disconnecting and freeing handles.\n");
    ret = SQLDisconnect( hdlDbc );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error disconnecting from database. Transaction still open?\n");
        exit(EXIT_FAILURE);
    }

    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

运行以上代码后,将输出以下内容:

Allocated an environment handle.
Set application version to ODBC 3.
Allocated Database handle.
Connecting to database.
Connected to database.
Disconnecting and freeing handles.

有关使用 SQLDriverConnect 连接到数据库的示例,请参阅设置 ODBC 会话的区域设置和编码

注意

  • 如果您使用 DataDirect® 驱动程序管理器,则应在连接到 Vertica 时始终对 SQLDriverConnect 函数的 DriverCompletion 参数(函数调用中的最终参数)使用 SQL_DRIVER_NOPROMPT 值。Linux 和 UNIX 平台上 Vertica 的 ODBC 驱动程序不包含 UI,因此无法提示用户输入密码。

  • 在 Windows 客户端平台上,ODBC 驱动程序会提示用户提供连接信息。有关详细信息,请参阅提示 Windows 用户提供缺少的连接属性

  • 如果数据库不符合 Vertica 许可协议,应用程序将在 SQLConnect() 函数的返回值中收到警告消息。始终应让应用程序检查此返回值是否为 SQL_SUCCESS_WITH_INFO。如果是,应让应用程序提取消息并向用户显示。

6 - 负载均衡

本机连接负载均衡

本机连接负载均衡有助于在 Vertica 数据库中的主机上分散客户端连接所带来的开销。服务器和客户端都必须启用本机连接负载均衡。如果两者者都启用了本机负载均衡,则当客户端最初连接到数据库中的主机时,此主机会从数据库中当前正在运行的主机列表中选择一个主机来处理客户端连接,并通知客户端它所选择的主机。

如果最初联系的主机没有选择自身来处理连接,则客户端会断开连接,然后打开指向第一个主机所选主机的另一个连接。指向此第二个主机的连接进程将照常进行—如果启用了 SSL,则会启动 SSL 协商,否则客户端会启动身份验证过程。有关详细信息,请参阅关于本机连接负载均衡

若要在客户端上启用本机连接负载均衡,请在 DSN 条目或连接字符串中将 ConnectionLoadBalance 连接参数设置为 true。以下示例说明了如何在本机连接负载均衡已启用的情况下多次连接到数据库,以及从 V_MONITOR.CURRENT_SESSION 系统表获取处理连接的节点的名称。

// Demonstrate enabling native load connection balancing.
// Standard i/o library
#include <stdlib.h>
#include <iostream>
#include <assert.h>
// Only needed for Windows clients
// #include <windows.h>
// SQL include files that define data types and ODBC API
// functions
#include <sql.h>
#include <sqlext.h>
#include <sqltypes.h>

using namespace std;
int main()
{
    SQLRETURN ret;   // Stores return value from ODBC API calls
    SQLHENV hdlEnv;  // Handle for the SQL environment object
    // Allocate an a SQL environment object
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    assert(SQL_SUCCEEDED(ret));

    // Set the ODBC version we are going to use to
    // 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
            (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    assert(SQL_SUCCEEDED(ret));

    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    assert(SQL_SUCCEEDED(ret));

    // Connect four times. If load balancing is on, client should
    // connect to different nodes.
    for (int x=1; x <= 4; x++) {

        // Connect to the database using SQLDriverConnect. Set
        // ConnectionLoadBalance to 1 (true) to enable load
        // balancing.
        cout << endl << "Connection attempt #" << x << "... ";
        const char *connStr = "DSN=VMart;ConnectionLoadBalance=1;"
            "UID=ExampleUser;PWD=password123";


        ret = SQLDriverConnect(hdlDbc, NULL, (SQLCHAR*)connStr, SQL_NTS,
               NULL, 0, NULL, SQL_DRIVER_NOPROMPT );
        if(!SQL_SUCCEEDED(ret)) {
            cout << "failed. Exiting." << endl;
            exit(EXIT_FAILURE);
        } else {
            cout << "succeeded" << endl;
        }
        // We're connected. Query the v_monitor.current_session table to
        // find the name of the node we've connected to.

        // Set up a statement handle
        SQLHSTMT hdlStmt;
        SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);
        assert(SQL_SUCCEEDED(ret));

        ret = SQLExecDirect( hdlStmt, (SQLCHAR*)"SELECT node_name FROM "
            "V_MONITOR.CURRENT_SESSION;", SQL_NTS );

        if(SQL_SUCCEEDED(ret)) {
            // Bind varible to column in result set.
            SQLTCHAR node_name[256];
            ret = SQLBindCol(hdlStmt, 1, SQL_C_TCHAR, (SQLPOINTER)node_name,
                sizeof(node_name), NULL);
            while(SQL_SUCCEEDED(ret = SQLFetchScroll(hdlStmt, SQL_FETCH_NEXT,1))) {
                // Print the bound variables, which now contain the values from the
                // fetched row.
                cout << "Connected to node " << node_name << endl;
            }
        }
        // Free statement handle
        SQLFreeHandle(SQL_HANDLE_STMT,hdlStmt);
        cout << "Disconnecting." << endl;
        ret = SQLDisconnect( hdlDbc );
        assert(SQL_SUCCEEDED(ret));
    }
    // When done, free all of the handles to close them
    // in an orderly fashion.
    cout << endl << "Freeing handles..." << endl;
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    cout << "Done!" << endl;
    exit(EXIT_SUCCESS);
}

运行以上示例后,将生成类似于以下内容的输出:

Connection attempt #1... succeeded
Connected to node v_vmart_node0001
Disconnecting.

Connection attempt #2... succeeded
Connected to node v_vmart_node0002
Disconnecting.

Connection attempt #3... succeeded
Connected to node v_vmart_node0003
Disconnecting.

Connection attempt #4... succeeded
Connected to node v_vmart_node0001
Disconnecting.

Freeing handles...
Done!

基于主机名的负载均衡

您还可以通过将单个主机名解析为多个 IP 地址来平衡工作负载。ODBC 客户端驱动程序通过自动将主机名随机解析为指定的 IP 地址之一来实现负载均衡。

例如,假设主机名 verticahost.example.cometc/hosts 中有以下条目:

192.0.2.0 verticahost.example.com
192.0.2.1 verticahost.example.com
192.0.2.2 verticahost.example.com

指定主机名 verticahost.example.com 会随机解析为列出的 IP 地址之一。

7 - 连接故障转移

如果客户端应用程序尝试连接到 Vertica 群集中处于关闭状态的主机,则使用默认连接配置时连接尝试将会失败。此故障通常会向用户返回一个错误。用户必须等待主机恢复并重试连接,或者手动编辑连接设置以选择其他主机。

由于 Vertica 分析型数据库采用分布式架构,通常您不会关注哪个数据库主机处理客户端应用程序的连接。可以使用客户端驱动程序的连接故障转移功能来防止用户在无法访问连接设置中所指定主机的情况下收到连接错误。JDBC 驱动程序可通过几种方式让客户端驱动程序在无法访问连接参数中所指定的主机时自动尝试连接到其他主机:

  • 将 DNS 服务器配置为返回一个主机名的多个 IP 地址。在连接设置中使用此主机名时,客户端会尝试连接到 DNS 找到的第一个 IP 地址。如果位于该 IP 地址的主机无法访问,则客户端会尝试连接到第二个 IP,依此类推,直到其成功地连接到某个主机或试过所有 IP 地址为止。

  • 提供当您在连接参数中指定的主要主机无法访问时客户端驱动程序尝试连接的备份主机列表。

  • (仅限 JDBC)在尝试连接到下一个节点之前,使用特定于驱动程序的连接属性来管理超时。

在所有方法中,故障转移过程对于客户端应用程序是透明的(除了在选择使用列表故障转移方法时需指定备份主机列表之外)。如果主要主机无法访问,客户端驱动程序会自动尝试连接到其他主机。

故障转移仅适用于初次建立客户端连接的情况。如果连接断开,驱动程序不会自动尝试重新连接到数据库中的其他主机。

通常可选择上述两种故障转移方法中的一种。但它们可以一起使用。如果您的 DNS 服务器返回多个 IP 地址,并且您提供了备份主机列表,则客户端会首先尝试连接 DNS 服务器返回的所有 IP,然后再尝试连接备份列表中的主机。

DNS 故障转移方法可集中处理配置客户端故障转移。在向 Vertica 分析型数据库群集添加新节点时,可以选择通过编辑 DNS 服务器设置来将这些节点添加到故障转移列表中。所有使用 DNS 服务器连接到 Vertica 分析型数据库的客户端系统都会自动采用连接故障转移,而无需更改任何设置。但此方法需要对所有客户端用于连接到 Vertica 分析型数据库群集的 DNS 服务器具有管理访问权限。这在贵组织中可能无法实现。

使用备份服务器列表要比编辑 DNS 服务器设置更加容易。但此方法不能集中处理故障转移功能。如果更改了 Vertica 分析型数据库群集,您可能需要在每个客户端系统上更新应用程序设置。

使用 DNS 故障转移

若要使用 DNS 故障转移,您需要更改 DNS 服务器的设置,将单一主机名映射为 Vertica 分析型数据库群集中主机的多个 IP 地址。然后让所有客户端应用程序都使用该主机名连接到 Vertica 分析型数据库。

您可以选择让 DNS 服务器为主机名返回任何所需数量的 IP 地址。在小型群集中,您可以选择让其返回群集中所有主机的 IP 地址。但对于大型群集,应考虑选择返回一部分主机。否则可能会导致长时间延迟,因为客户端驱动程序尝试连接到数据库中处于关闭状态的每个主机会失败。

使用备份主机列表

若要启用基于备份列表的连接故障转移,需要在客户端应用程序的 BackupServerNode 参数中指定主机的至少一个 IP 地址或主机名。可以视情况在主机名或 IP 后面使用冒号和端口号。如果未提供端口号,驱动程序会默认使用标准 Vertica 端口号 (5433)。若要列出多个主机,请用逗号分隔这些主机。

以下示例演示了如何通过设置 BackupServerNode 连接参数来指定可尝试连接的其他主机。由于有意在连接字符串中使用了一个不存在的节点,因此初始连接失败。客户端驱动程序必须尝试通过备份主机来与 Vertica 建立连接。

// Demonstrate using connection failover.
// Standard i/o library
#include <stdlib.h>
#include <iostream>
#include <assert.h>

// Only needed for Windows clients
// #include <windows.hgt;

// SQL include files that define data types and ODBC API
// functions
#include <sql.h>
#include <sqlext.h>
#include <sqltypes.h>

using namespace std;

int main()
{
    SQLRETURN ret;   // Stores return value from ODBC API calls
    SQLHENV hdlEnv;  // Handle for the SQL environment object
    // Allocate an a SQL environment object
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    assert(SQL_SUCCEEDED(ret));

    // Set the ODBC version we are going to use to
    // 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
            (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    assert(SQL_SUCCEEDED(ret));

    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    assert(SQL_SUCCEEDED(ret));

/* DSN for this connection specifies a bad node, and good backup nodes:
[VMartBadNode]
Description=VMart Vertica Database
Driver=/opt/vertica/lib64/libverticaodbc.so
Database=VMart
Servername=badnode.example.com
BackupServerNode=v_vmart_node0002.example.com,v_vmart_node0003.example.com
*/

    // Connect to the database using SQLConnect
    cout << "Connecting to database." << endl;
    const char *dsnName = "VMartBadNode"; // Name of the DSN
    const char* userID = "ExampleUser"; // Username
    const char* passwd = "password123"; // password
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        cout << "Could not connect to database." << endl;
        exit(EXIT_FAILURE);
    } else {
        cout << "Connected to database." << endl;
    }
    // We're connected. Query the v_monitor.current_session table to
    // find the name of the node we've connected to.

    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);
    assert(SQL_SUCCEEDED(ret));

    ret = SQLExecDirect( hdlStmt, (SQLCHAR*)"SELECT node_name FROM "
        "v_monitor.current_session;", SQL_NTS );

    if(SQL_SUCCEEDED(ret)) {
        // Bind varible to column in result set.
        SQLTCHAR node_name[256];
        ret = SQLBindCol(hdlStmt, 1, SQL_C_TCHAR, (SQLPOINTER)node_name,
            sizeof(node_name), NULL);
        while(SQL_SUCCEEDED(ret = SQLFetchScroll(hdlStmt, SQL_FETCH_NEXT,1))) {
            // Print the bound variables, which now contain the values from the
            // fetched row.
            cout << "Connected to node " << node_name << endl;
        }
    }

    cout << "Disconnecting." << endl;
    ret = SQLDisconnect( hdlDbc );
    assert(SQL_SUCCEEDED(ret));

    // When done, free all of the handles to close them
    // in an orderly fashion.
    cout << endl << "Freeing handles..." << endl;
    SQLFreeHandle(SQL_HANDLE_STMT,hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    cout << "Done!" << endl;
    exit(EXIT_SUCCESS);
}

运行此示例后,系统控制台上的输出类似于以下内容:

Connecting to database.
Connected to database.
Connected to node v_vmart_node0002
Disconnecting.

Freeing handles...
Done!

请注意,系统会与备份列表中的第一个节点建立连接(节点 2)。

8 - 提示 Windows 用户提供缺少的连接属性

如果缺少必需信息,Vertica Windows ODBC 驱动程序会提示用户提供连接信息。当客户端应用程序调用 SQLDriverConnect 以连接到 Vertica 并且以下任一条件成立时,驱动程序将显示“Vertica 连接对话框 (Vertica Connection Dialog)”:

  • DriverCompletion 属性已设置为 SQL_DRIVER_PROMPT。

  • DriverCompletion 属性已设置为 SQL_DRIVER_COMPLETE 或 SQL_DRIVER_COMPLETE_REQUIRED,且用于建立连接的连接字符串或 DSN 缺少服务器、数据库或端口信息。

如果以上任一条件成立,驱动程序将向用户显示“Vertica 连接对话框 (Vertica Connection Dialog)”以提示用户提供连接信息。

该对话框包含填入的连接字符串或 DSN 中提供的所有属性值。

连接对话框上的必填字段是“数据库 (Database)”、“UID”、“服务器 (Server)”和“端口 (Port)”。填充这些字段后,表单将启用确定 (OK) 按钮。

如果用户在该对话框上单击取消 (Cancel)SQLDriverConnect 函数调用将立即返回 SQL_NO_DATA,而不会尝试连接到 Vertica。如果用户提供的连接信息不完整或不正确,连接函数将在连接尝试失败后返回 SQL_ERROR。

9 - 提示 Windows 用户提供密码

如果向 SQLDriverConnect 函数(客户端应用程序调用此函数以连接到 Vertica)提供的连接字符串或 DSN 缺少连接到数据库所需的任何必需的连接属性,Vertica Windows ODBC 驱动程序将打开一个对话框,提示用户输入缺失的信息(请参阅提示 Windows 用户提供缺少的连接属性)。一般情况下,不将用户密码视为必需的连接属性,因为 Vertica 用户帐户可能不具有密码。如果缺少密码属性,ODBC 驱动程序仍会在未提供密码的情况下尝试连接到 Vertica。

您可以使用 PromptOnNoPassword DSN 参数来强制 ODBC 驱动程序将密码视为必需的连接属性。如果不想在 DSN 条目中存储密码,则此参数很有用。保存在 DSN 条目中的密码是不安全的,因为这些密码作为明文存储在 Windows 注册表中,从而对同一个系统上的其他用户可见。

还有另外两个因素决定 ODBC 驱动程序是否显示 Vertica“连接 (Connection)”对话框。这两个因素如下所示(按优先级顺序):

  • SQLDriverConnect 函数调用的 DriverCompletion 参数。

  • DSN 或连接字符串是否包含密码。

下表显示了 PromptOnNoPassword DSN 参数、SQLDriverConnect 函数的 DriverCompletion 参数以及“DSN 或连接字符串是否包含密码”如何交互以控制是否显示 Vertica“连接 (Connection)”对话框。

以下示例代码演示了如何在 C++ 中将 PromptOnNoPassword DSN 参数和系统 DSN 一起使用:

wstring connectString = L "DSN=VerticaDSN;PromptOnNoPassword=1;";
retcode = SQLDriverConnect(
    hdbc,
    0,
    (SQLWCHAR * ) connectString.c_str(),
    connectString.length(),
    OutConnStr,
    255, &
    amp; OutConnStrLen,
    SQL_DRIVER_COMPLETE);

无密码条目与空密码

连接字符串或 DSN 中不包含密码和包含空密码是两种不同情况。仅当连接字符串或 DSN 不具有 PWD 属性(存放用户密码)时,PromptOnNoPassword DSN 参数才起作用。如果具有该属性,则即使该属性为空,PromptOnNoPassword 也不会提示 Windows ODBC 驱动程序显示 Vertica“连接 (Connection)”对话框。

如果要使用 DSN 为连接提供属性,则此差别会造成混乱。在 Windows ODBC 管理器中为 DSN 连接输入密码并保存该 DSN 连接后,Windows 会将 PWD 属性添加到注册表中的 DSN 定义。如果以后删除该密码,PWD 属性会保留在 DSN 定义中(只会将其值设置为空字符串)。即使您只是使用 ODBC 管理器对话框上的“测试 (Test)”按钮测试 DSN,并且稍后在保存之前清除了该 DSN,也会创建 PWD 属性。

设置了密码后,从 DSN 定义中移除 PWD 属性的唯一方法是使用 Windows 注册表编辑器:

  1. 单击 Windows“开始”菜单,然后单击“运行”。

  2. 在“运行”对话框中,键入 regedit,然后单击“确定”。

  3. 在“注册表编辑器 (Registry Editor)”窗口中,单击“编辑 (Edit)”>“查找 (Find)”(或按 Ctrl+F)。

  4. 在“查找”窗口中,输入要删除其 PWD 属性的 DSN 的名称,然后单击“确定”。

  5. 如果查找操作无法定位到 ODBC.INI 文件夹下的某个文件夹,请单击“编辑 (Edit)”>“查找下一个 (Find Next)”(或按 F3),直至已突出显示与 DSN 的名称匹配的文件夹为止。

  6. 选择 PWD 项,然后按 Delete。

  7. 单击“是”以确认删除该值。

该 DSN 现已不具有 PWD 属性,并且可以在与 PromptOnNoPassword=true 和 DriverConnect=SQL_DRIVER_COMPLETE 一起使用时促使连接对话框显示出来。

10 - 设置 ODBC 会话的区域设置和编码

Vertica 提供以下方法来设置 ODBC 会话的区域设置和编码:

  • 使用 DSN 指定所有已建立的连接的区域设置:

  • SQLDriverConnect() 函数的连接字符串中设置 Locale 连接参数。例如:

    SQLDriverConnect(conn, NULL, (SQLCHAR*)"DSN=Vertica;Locale=en_GB@collation=binary", SQL_NTS, szConnOut, sizeof(szConnOut), &iAvailable, SQL_DRIVER_NOPROMPT)
    
  • 使用 SQLSetConnectAttr() 设置编码和区域设置。通常,您应该始终使用此函数设置编码,而不是在 DSN 中设置它。

    • 传递 SQL_ATTR_VERTICA_LOCALE 常量和 ICU 字符串作为属性值。例如:

      => SQLSetConnectAttr(hdlDbc, SQL_ATTR_VERTICA_LOCALE, (SQLCHAR*)newLocale,
              SQL_NTS);
      
    • 传递 SQL_ATTR_AP_WCHAR_TYPE 常量和编码作为属性值。例如:

      => rc = SQLSetConnectAttr (hdbc, SQL_ATTR_APP_WCHAR_TYPE, (void *)SQL_DD_CP_UTF16, SQL_IS_INTEGER);
      

注意

  • 如果让客户端系统使用非 Unicode 区域设置(例如,在 Linux 平台上设置 LANG=C)并对与 Vertica 的连接使用 Unicode 区域设置,则会生成诸如“(10170) 来自数据源的数据进行字符串数据右截断 (String data right truncation on data from data source)”等错误。如果从 Vertica 收到的数据不采用 UTF-8 格式,驱动程序将基于系统的区域设置来分配字符串内存,并且非 UTF-8 数据会触发超限。如果始终在客户端系统上使用 Unicode 区域设置,您可以避免这些错误。

    如果在连接字符串或 DSN 中指定区域设置,调用连接函数将返回有关成功连接的 SQL_SUCCESS_WITH_INFO,并显示有关区域设置状态的消息。

  • ODBC 应用程序可以处于 ANSI 模式或 Unicode 模式:

    • 如果处于 Unicode 模式,则 ODBC 使用的编码是 UCS-2。

    • 如果处于 ANSI 模式,则数据必须采用单字节 ASCII 编码,此编码与数据库服务器上的 UTF-8 兼容。

    向 Vertica 服务器传递数据时,ODBC 驱动程序会将 UCS-2 转换为 UTF-8,并会将 Vertica 服务器发来的数据从 UTF-8 转换为 UCS-2。

  • 如果最终用户应用程序尚未处于 UCS-2 编码模式,则应用程序应负责将输入数据转换为 UCS-2,否则会出现意外结果。例如:

    • 向 ODBC API 传递非 UCS-2 数据时,如果该数据解释为 UCS-2,将导致向 API 传递无效的 UCS-2 符号,从而生成错误。

    • 或者在备用编码中提供的符号可能是有效的 UCS-2 符号;在这种情况下,会将不正确的数据插入到数据库。

    ODBC 应用程序应使用 SQLSetConnectAttr 设置正确的服务器会话区域设置(如果与数据库范围设置不同),以便在服务器上设置正确的排序规则和字符串函数行为。

以下示例代码演示了同时使用连接字符串和 SQLSetConnectAttr() 函数来设置区域设置。

// Standard i/o library
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// SQL include files that define data types and ODBC API
// functions
#include <sql.h>
#include <sqlext.h>
#include <sqltypes.h>
// Vertica-specific definitions. This include file is located as
// /opt/vertica/include on database hosts.
#include <verticaodbc.h>
int main()
{
    SQLRETURN ret;   // Stores return value from ODBC API calls
    SQLHENV hdlEnv;  // Handle for the SQL environment object
    // Allocate an a SQL environment object
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    // Set the ODBC version we are going to use to 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not set application version to ODBC 3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application version to ODBC 3.\n");
    }
    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate database handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated Database handle.\n");
    }
    // Connect to the database using SQLDriverConnect
    printf("Connecting to database.\n");
    // Set the locale to English in Great Britain.
    const char *connStr = "DSN=ExampleDB;locale=en_GB;"
        "UID=dbadmin;PWD=password123";
    ret = SQLDriverConnect(hdlDbc, NULL, (SQLCHAR*)connStr, SQL_NTS,
               NULL, 0, NULL, SQL_DRIVER_NOPROMPT );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not connect to database.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }
    // Get the Locale
    char locale[256];
    SQLGetConnectAttr(hdlDbc, SQL_ATTR_VERTICA_LOCALE, locale, sizeof(locale),
        0);
    printf("Locale is set to: %s\n", locale);
    // Set the locale to a new value
    const char* newLocale = "en_GB";
    SQLSetConnectAttr(hdlDbc, SQL_ATTR_VERTICA_LOCALE, (SQLCHAR*)newLocale,
        SQL_NTS);

    // Get the Locale again
    SQLGetConnectAttr(hdlDbc, SQL_ATTR_VERTICA_LOCALE, locale, sizeof(locale),
        0);
    printf("Locale is now set to: %s\n", locale);

    // Set the encoding
    SQLSetConnectAttr (hdbc, SQL_ATTR_APP_WCHAR_TYPE, (void *)SQL_DD_CP_UTF16,
        SQL_IS_INTEGER);

    // When done, free all of the handles to close them
    // in an orderly fashion.
    printf("Disconnecting and freeing handles.\n");
    ret = SQLDisconnect( hdlDbc );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error disconnecting from database. Transaction still open?\n");
        exit(EXIT_FAILURE);
    }
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

11 - AUTOCOMMIT 事务和 ODBC 事务

AUTOCOMMIT 连接属性控制 INSERT、ALTER、COPY 和其他数据处理语句是否在完成后自动提交。默认情况下,AUTOCOMMIT 已启用,所有语句将在执行后提交。这通常不是最佳设置,因为效率较低。此外,您通常想要控制是否作为一个整体提交一组语句,而非逐个提交语句。例如,您可能只想在所有插入都成功时才提交一系列插入。如果 AUTOCOMMIT 已禁用,则当其中一个语句失败时,您可以回退事务。

如果 AUTOCOMMIT 已打开,则会在执行语句之后立即提交这些语句的结果。不能回退在 AUTOCOMMIT 模式下执行的语句。

例如,如果 AUTOCOMMIT 已打开,将自动提交以下单个 INSERT 语句:

ret = SQLExecDirect(hdlStmt, (SQLCHAR*)"INSERT INTO customers VALUES(500,"
    "'Smith, Sam', '123-456-789');", SQL_NTS);

如果 AUTOCOMMIT 已关闭,您需要在执行语句之后手动提交事务。例如:

ret = SQLExecDirect(hdlStmt, (SQLCHAR*)"INSERT INTO customers VALUES(500,"
    "'Smith, Sam', '123-456-789');", SQL_NTS);
// Other inserts and data manipulations
// Commit the statements(s)
ret = SQLEndTran(SQL_HANDLE_DBC, hdlDbc, SQL_COMMIT);

只有在调用 SQLEndTran() 时,才会提交插入的行。提交事务之前,您可以随时回退 INSERT 和其他语句。

以下示例演示了关闭 AUTOCOMMIT 以及执行插入和手动提交事务。

// Some standard headers
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
int main()
{
    // Set up the ODBC environment
    SQLRETURN ret;
    SQLHENV hdlEnv;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    // Tell ODBC that the application uses ODBC 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not set application version to ODBC3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate database handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated Database handle.\n");
    }
    // Connect to the database
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not connect to database.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }
    // Get the AUTOCOMMIT state
    SQLINTEGER  autoCommitState;
    SQLGetConnectAttr(hdlDbc, SQL_ATTR_AUTOCOMMIT, &autoCommitState, 0, NULL);
    printf("Autocommit is set to: %d\n", autoCommitState);


    // Disable AUTOCOMMIT
    printf("Disabling autocommit.\n");
    ret = SQLSetConnectAttr(hdlDbc, SQL_ATTR_AUTOCOMMIT, SQL_AUTOCOMMIT_OFF,
        SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not disable autocommit.\n");
        exit(EXIT_FAILURE);
    }

    // Get the AUTOCOMMIT state again
    SQLGetConnectAttr(hdlDbc, SQL_ATTR_AUTOCOMMIT, &autoCommitState, 0, NULL);
    printf("Autocommit is set to: %d\n", autoCommitState);

    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);


    // Create a table to hold the data
    SQLExecDirect(hdlStmt, (SQLCHAR*)"DROP TABLE IF EXISTS customers",
        SQL_NTS);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"CREATE TABLE customers "
        "(CustID int, CustName varchar(100), Phone_Number char(15));",
        SQL_NTS);


    // Insert a single row.
    ret = SQLExecDirect(hdlStmt, (SQLCHAR*)"INSERT INTO customers VALUES(500,"
        "'Smith, Sam', '123-456-789');", SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not perform single insert.\n");
    } else {
        printf("Performed single insert.\n");
    }


    // Need to commit the transaction before closing, since autocommit is
    // disabled. Otherwise SQLDisconnect returns an error.
    printf("Committing transaction.\n");
    ret =  SQLEndTran(SQL_HANDLE_DBC, hdlDbc, SQL_COMMIT);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error committing transaction.\n");
        exit(EXIT_FAILURE);
    }

    // Clean up
    printf("Free handles.\n");
    ret = SQLDisconnect(hdlDbc);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error disconnecting from database. Transaction still open?\n");
        exit(EXIT_FAILURE);
    }
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

运行以上代码后,将生成以下输出:

Allocated an environment handle.
Set application to ODBC 3.
Allocated Database handle.
Connecting to database.
Connected to database.
Autocommit is set to: 1
Disabling autocommit.
Autocommit is set to: 0
Performed single insert.
Committing transaction.
Free handles.

12 - 检索数据

若要通过 ODBC 检索数据,应执行将返回结果集的查询(例如 SELECT),然后使用以下两种方法之一检索结果:

  • 使用 SQLFetch() 函数检索结果集的某个行,然后通过调用 SQLGetData() 访问该行中的列值。

  • 使用 SQLBindColumn() 函数将变量或数组绑定到结果集中的某个列,然后调用 SQLExtendedFetch()SQLFetchScroll() 以读取结果集的某个行并将其值插入到该变量或数组中。

使用以上两种方法时,应在结果集中循环直至到达结尾(由 SQL_NO_DATA 返回状态指示)或遇到错误为止。

以下代码示例演示了如何通过以下过程从 Vertica 检索数据:

  1. 连接到数据库。

  2. 执行将返回所有表的 ID 和名称的 SELECT 语句。

  3. 将两个变量绑定到结果集中的两个列。

  4. 在结果集中循环,并输出 ID 和名称值。

// Demonstrate running a query and getting results by querying the tables
// system table for a list of all tables in the current schema.
// Some standard headers
#include <stdlib.h>
#include <sstream>
#include <iostream>
#include <assert.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>

// Use std namespace to make output easier
using namespace std;
// Helper function to print SQL error messages.
template <typename HandleT>
void reportError(int handleTypeEnum, HandleT hdl)
{
    // Get the status records.
    SQLSMALLINT   i, MsgLen;
    SQLRETURN ret2;
    SQLCHAR       SqlState[6], Msg[SQL_MAX_MESSAGE_LENGTH];
    SQLINTEGER    NativeError;
    i = 1;
    cout << endl;
    while ((ret2 = SQLGetDiagRec(handleTypeEnum, hdl, i, SqlState, &NativeError,
        Msg, sizeof(Msg), &MsgLen)) != SQL_NO_DATA) {
        cout << "error record #" << i++ << endl;
        cout << "sqlstate: " << SqlState << endl;
        cout << "detailed msg: " << Msg << endl;
        cout << "native error code: " << NativeError << endl;
    }
}

typedef struct {
    SQLHENV hdlEnv;
    SQLHDBC hdlDbc;
} DBConnection;

void connect(DBConnection *pConnInfo)
{
    // Set up the ODBC environment
    SQLRETURN ret;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &pConnInfo->hdlEnv);
    assert(SQL_SUCCEEDED(ret));
    // Tell ODBC that the application uses ODBC 3.
    ret = SQLSetEnvAttr(pConnInfo->hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER)SQL_OV_ODBC3, SQL_IS_UINTEGER);
    assert(SQL_SUCCEEDED(ret));

    // Allocate a database handle.
    ret = SQLAllocHandle(SQL_HANDLE_DBC, pConnInfo->hdlEnv, &pConnInfo->hdlDbc);
    assert(SQL_SUCCEEDED(ret));
    // Connect to the database
    cout << "Connecting to database." << endl;
    const char* dsnName = "ExampleDB";
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(pConnInfo->hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS, (SQLCHAR*)userID, SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if (!SQL_SUCCEEDED(ret)) {
        cout << "Could not connect to database" << endl;
        reportError<SQLHDBC>(SQL_HANDLE_DBC, pConnInfo->hdlDbc);
        exit(EXIT_FAILURE);
    }
    else {
        cout << "Connected to database." << endl;
    }
}

void disconnect(DBConnection *pConnInfo)
{
    SQLRETURN ret;
    // Clean up by shutting down the connection
    cout << "Free handles." << endl;
    ret = SQLDisconnect(pConnInfo->hdlDbc);
    if (!SQL_SUCCEEDED(ret)) {
        cout << "Error disconnecting. Transaction still open?" << endl;
        exit(EXIT_FAILURE);
    }
    SQLFreeHandle(SQL_HANDLE_DBC, pConnInfo->hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, pConnInfo->hdlEnv);
}

void executeQuery(SQLHDBC hdlDbc, SQLCHAR* pQuery)
{
    SQLRETURN ret;
    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);
    assert(SQL_SUCCEEDED(ret));

    // Execute a query to get the names and IDs of all tables in the schema
    // search p[ath (usually public).
    ret = SQLExecDirect(hdlStmt, pQuery, SQL_NTS);

    if (!SQL_SUCCEEDED(ret)) {
        // Report error an go no further if statement failed.
        cout << "Error executing statement." << endl;
        reportError<SQLHDBC>(SQL_HANDLE_STMT, hdlStmt);
        exit(EXIT_FAILURE);
    }
    else {
        // Query succeeded, so bind two variables to the two colums in the
        // result set,
        cout << "Fetching results..." << endl;
        SQLBIGINT table_id;       // Holds the ID of the table.
        SQLTCHAR table_name[256]; // buffer to hold name of table
        ret = SQLBindCol(hdlStmt, 1, SQL_C_SBIGINT, (SQLPOINTER)&table_id,
            sizeof(table_id), NULL);
        ret = SQLBindCol(hdlStmt, 2, SQL_C_TCHAR, (SQLPOINTER)table_name,
            sizeof(table_name), NULL);

        // Loop through the results,
        while (SQL_SUCCEEDED(ret = SQLFetchScroll(hdlStmt, SQL_FETCH_NEXT, 1))) {
            // Print the bound variables, which now contain the values from the
            // fetched row.
            cout << table_id << " | " << table_name << endl;
        }


        // See if loop exited for reasons other than running out of data
        if (ret != SQL_NO_DATA) {
            // Exited for a reason other than no more data... report the error.
            reportError<SQLHDBC>(SQL_HANDLE_STMT, hdlStmt);
        }
    }
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
}

int main()
{
    DBConnection conn;

    connect(&conn);
    executeQuery(conn.hdlDbc,
        (SQLCHAR*)"SELECT table_id, table_name FROM tables ORDER BY table_name");
    executeQuery(conn.hdlDbc,
        (SQLCHAR*)"SELECT table_id, table_name FROM tables ORDER BY table_id");
    disconnect(&conn);
    exit(EXIT_SUCCESS);
}

在 vmart 数据库中运行以上示例代码后,将生成类似于以下内容的输出:

Connecting to database.
Connected to database.
Fetching results...
45035996273970908 | call_center_dimension
45035996273970836 | customer_dimension
45035996273972958 | customers
45035996273970848 | date_dimension
45035996273970856 | employee_dimension
45035996273970868 | inventory_fact
45035996273970904 | online_page_dimension
45035996273970912 | online_sales_fact
45035996273970840 | product_dimension
45035996273970844 | promotion_dimension
45035996273970860 | shipping_dimension
45035996273970876 | store_dimension
45035996273970894 | store_orders_fact
45035996273970880 | store_sales_fact
45035996273972806 | t
45035996273970852 | vendor_dimension
45035996273970864 | warehouse_dimension
Fetching results...
45035996273970836 | customer_dimension
45035996273970840 | product_dimension
45035996273970844 | promotion_dimension
45035996273970848 | date_dimension
45035996273970852 | vendor_dimension
45035996273970856 | employee_dimension
45035996273970860 | shipping_dimension
45035996273970864 | warehouse_dimension
45035996273970868 | inventory_fact
45035996273970876 | store_dimension
45035996273970880 | store_sales_fact
45035996273970894 | store_orders_fact
45035996273970904 | online_page_dimension
45035996273970908 | call_center_dimension
45035996273970912 | online_sales_fact
45035996273972806 | t
45035996273972958 | customers
Free handles.

13 - 加载数据

许多客户端应用程序的主要任务是将数据加载到 Vertica 数据库。有多种不同方法可用于通过 ODBC 插入数据,本节中的主题介绍这些方法。

13.1 - 使用单行插入

将数据加载到 Vertica 的最简单方法是使用 SQLExecuteDirect 函数运行 INSERT SQL 语句。但此方法仅能插入单个数据行。

ret = SQLExecDirect(hstmt, (SQLTCHAR*)"INSERT into Customers values"
      "(1,'abcda','efgh','1')", SQL_NTS);

13.2 - 使用预定义的语句

Vertica 支持将服务器端预定义的语句与 ODBC 和 JDBC 结合使用。使用预定义的语句,您只需定义一个语句一次,然后可以使用不同的参数多次运行该语句。要执行的语句包含占位符而非参数。执行语句时,应为每个占位符提供值。

在以下示例查询中,占位符用问号 (?) 表示:

SELECT * FROM public.inventory_fact WHERE product_key = ?

服务器端预定义的语句用于以下用途:

  • 优化查询。Vertica 只需要解析语句一次。

  • 防止 SQL 注入攻击。如果过滤用户输入时未能正确过滤嵌入在 SQL 语句中的字符串字面量转义字符,或者如果用户输入由于未强类型化而意外运行,则将发生 SQL 注入攻击。由于预定义的语句与输入数据分开进行解析,因此数据库不可能意外执行数据。

  • 将直接变量绑定到返回列。通过指向数据结构,代码不必执行额外转换。

以下示例演示了使用预定义的语句执行单一插入。

// Some standard headers
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
// Some constants for the size of the data to be inserted.
#define CUST_NAME_LEN 50
#define PHONE_NUM_LEN 15
#define NUM_ENTRIES 4
int main()
{
    // Set up the ODBC environment
    SQLRETURN ret;
    SQLHENV hdlEnv;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    // Tell ODBC that the application uses ODBC 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not set application version to ODBC3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    // Connect to the database
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not connect to database.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }

    // Disable AUTOCOMMIT
    printf("Disabling autocommit.\n");
    ret = SQLSetConnectAttr(hdlDbc, SQL_ATTR_AUTOCOMMIT, SQL_AUTOCOMMIT_OFF,
        SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not disable autocommit.\n");
        exit(EXIT_FAILURE);
    }


    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"DROP TABLE IF EXISTS customers",
        SQL_NTS);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"CREATE TABLE customers "
        "(CustID int, CustName varchar(100), Phone_Number char(15));",
        SQL_NTS);

    // Set up a bunch of variables to be bound to the statement
    // parameters.

    // Create the prepared statement. This will insert data into the
    // table we created above.
    printf("Creating prepared statement\n");
    ret = SQLPrepare (hdlStmt, (SQLTCHAR*)"INSERT INTO customers (CustID, "
        "CustName,  Phone_Number) VALUES(?,?,?)", SQL_NTS) ;
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not create prepared statement\n");
        SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
        SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
        SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
        exit(EXIT_FAILURE);
    } else {
        printf("Created prepared statement.\n");
    }
    SQLINTEGER custID = 1234;
    SQLCHAR custName[100] = "Fein, Fredrick";
    SQLVARCHAR phoneNum[15] = "555-123-6789";
    SQLLEN strFieldLen = SQL_NTS;
    SQLLEN custIDLen = 0;
    // Bind the data arrays to the parameters in the prepared SQL
    // statement
    ret = SQLBindParameter(hdlStmt, 1, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER,
        0, 0, &custID, 0 , &custIDLen);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind custID array\n");
        SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
        SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
        SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
        exit(EXIT_FAILURE);
    } else {
        printf("Bound custID to prepared statement\n");
    }
    // Bind CustNames
    SQLBindParameter(hdlStmt, 2, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,
        50, 0, (SQLPOINTER)custName,  0, &strFieldLen);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind custNames\n");
        SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
        SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
        SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
        exit(EXIT_FAILURE);
    } else {
        printf("Bound custName to prepared statement\n");
    }
    // Bind phoneNums
    SQLBindParameter(hdlStmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_CHAR,
        15, 0, (SQLPOINTER)phoneNum, 0, &strFieldLen);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind phoneNums\n");
        SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
        SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
        SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
        exit(EXIT_FAILURE);
    } else {
        printf("Bound phoneNum to prepared statement\n");
    }
    // Execute the prepared statement.
    printf("Running prepared statement...");
    ret = SQLExecute(hdlStmt);
    if(!SQL_SUCCEEDED(ret)) {
        printf("not successful!\n");
    }  else {
        printf("successful.\n");
    }

    // Done with batches, commit the transaction
    printf("Committing transaction\n");
    ret = SQLEndTran(SQL_HANDLE_DBC, hdlDbc, SQL_COMMIT);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not commit transaction\n");
    }  else {
        printf("Committed transaction\n");
    }

    // Clean up
    printf("Free handles.\n");
    ret = SQLDisconnect( hdlDbc );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error disconnecting. Transaction still open?\n");
        exit(EXIT_FAILURE);
    }
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

13.3 - 使用批量插入

可以使用批量插入将数据区块插入到数据库中。通过将数据拆分为多个批,您可以在加载每个批后收到有关任何拒绝的行的信息,从而监控加载进度。若要通过 ODBC 执行批量加载,通常可以将预定义的语句与绑定到数组(包含要加载的数据)结合使用。对于每个批,应将新的数据集加载到数组中,然后执行预定义的语句。

执行批量加载时,Vertica 使用 COPY 语句加载数据。所加载的其他每个批使用同一个 COPY 语句。该语句将保持打开,直至您执行以下操作为止:结束语句,关闭语句的光标或执行非 INSERT 语句。

将单个 COPY 语句用于多个批可以通过以下途径提高批量加载效率:

  • 减少插入各个批的开销

  • 将各个批合并为较大的 ROS 容器

虽然 Vertica 使用单个 COPY 语句插入一个事务中的多个批,但您可以在加载每个批之后查找由于行格式无效或存在数据类型问题而被拒绝的行(如果有)。有关详细信息,请参阅跟踪加载状态 (ODBC)

由于批量加载共享一个 COPY 语句,因此一个批中的错误会导致回退同一个事务中较早的批。

批量插入步骤

应用程序执行 ODBC 批量插入所需完成的步骤如下:

  1. 连接到数据库。

  2. 禁用连接的自动提交。

  3. 创建可插入要加载的数据的预定义的语句。

  4. 将该预定义的语句的参数绑定到包含要加载的数据的数组。

  5. 使用各个批的数据填充数组。

  6. 执行该预定义的语句。

  7. (可选)检查批量加载的结果,以查找拒绝的行。

  8. 重复以上三个步骤,直至已加载所有要加载的数据为止。

  9. 提交事务。

  10. (可选)检查整个批处理事务的结果。

以下示例代码演示了以上步骤的简化版本。

// Some standard headers
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
int main()
{
    // Number of data rows to insert
    const int NUM_ENTRIES = 4;

    // Set up the ODBC environment
    SQLRETURN ret;
    SQLHENV hdlEnv;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    // Tell ODBC that the application uses ODBC 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not set application version to ODBC3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate database handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated Database handle.\n");
    }
    // Connect to the database
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not connect to database.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }


    // Disable AUTOCOMMIT
    printf("Disabling autocommit.\n");
    ret = SQLSetConnectAttr(hdlDbc, SQL_ATTR_AUTOCOMMIT, SQL_AUTOCOMMIT_OFF,
                            SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not disable autocommit.\n");
        exit(EXIT_FAILURE);
    }

    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);

    // Create a table to hold the data
    SQLExecDirect(hdlStmt, (SQLCHAR*)"DROP TABLE IF EXISTS customers",
        SQL_NTS);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"CREATE TABLE customers "
        "(CustID int, CustName varchar(100), Phone_Number char(15));",
        SQL_NTS);

    // Create the prepared statement. This will insert data into the
    // table we created above.
    printf("Creating prepared statement\n");
    ret = SQLPrepare (hdlStmt, (SQLTCHAR*)"INSERT INTO customers (CustID, "
        "CustName,  Phone_Number) VALUES(?,?,?)", SQL_NTS) ;
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not create prepared statement\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Created prepared statement.\n");
    }
    // This is the data to be inserted into the database.
    SQLCHAR custNames[][50] = { "Allen, Anna", "Brown, Bill", "Chu, Cindy",
        "Dodd, Don" };
    SQLINTEGER custIDs[] = { 100, 101, 102, 103};
    SQLCHAR phoneNums[][15] = {"1-617-555-1234", "1-781-555-1212",
        "1-508-555-4321", "1-617-555-4444"};
    // Bind the data arrays to the parameters in the prepared SQL
    // statement. First is the custID.
    ret = SQLBindParameter(hdlStmt, 1, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER,
        0, 0, (SQLPOINTER)custIDs, sizeof(SQLINTEGER) , NULL);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind custID array\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Bound CustIDs array to prepared statement\n");
    }
    // Bind CustNames
    ret = SQLBindParameter(hdlStmt, 2, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,
        50, 0, (SQLPOINTER)custNames, 50, NULL);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind custNames\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Bound CustNames array to prepared statement\n");
    }
    // Bind phoneNums
    ret = SQLBindParameter(hdlStmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_CHAR,
        15, 0, (SQLPOINTER)phoneNums, 15, NULL);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind phoneNums\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Bound phoneNums array to prepared statement\n");
    }
    // Tell the ODBC driver how many rows we have in the
    // array.
    ret = SQLSetStmtAttr( hdlStmt, SQL_ATTR_PARAMSET_SIZE,
        (SQLPOINTER)NUM_ENTRIES, 0 );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not bind set parameter size\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Bound phoneNums array to prepared statement\n");
    }

    // Add multiple batches to the database. This just adds the same
    // batch of data four times for simplicity's sake. Each call adds
    // the 4 rows into the database.
    for (int batchLoop=1; batchLoop<=5; batchLoop++) {
        // Execute the prepared statement, loading all of the data
        // in the arrays.
        printf("Adding Batch #%d...", batchLoop);
        ret = SQLExecute(hdlStmt);
        if(!SQL_SUCCEEDED(ret)) {
           printf("not successful!\n");
        }  else {
            printf("successful.\n");
        }
    }
    // Done with batches, commit the transaction
    printf("Committing transaction\n");
    ret = SQLEndTran(SQL_HANDLE_DBC, hdlDbc, SQL_COMMIT);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not commit transaction\n");
    }  else {
        printf("Committed transaction\n");
    }

    // Clean up
    printf("Free handles.\n");
    ret = SQLDisconnect( hdlDbc );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error disconnecting. Transaction still open?\n");
        exit(EXIT_FAILURE);
    }
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

运行以上代码后的结果如下所示。

Allocated an environment handle.
Set application to ODBC 3.
Allocated Database handle.
Connecting to database.
Connected to database.
Creating prepared statement
Created prepared statement.
Bound CustIDs array to prepared statement
Bound CustNames array to prepared statement
Bound phoneNums array to prepared statement
Adding Batch #1...successful.
Adding Batch #2...successful.
Adding Batch #3...successful.
Adding Batch #4...successful.
Adding Batch #5...successful.
Committing transaction
Committed transaction
Free handles.

结果表如下所示:

=> SELECT * FROM customers;
 CustID |  CustName   |  Phone_Number
--------+-------------+-----------------
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
    100 | Allen, Anna | 1-617-555-1234
    101 | Brown, Bill | 1-781-555-1212
    102 | Chu, Cindy  | 1-508-555-4321
    103 | Dodd, Don   | 1-617-555-4444
(20 rows)

13.3.1 - 跟踪加载状态 (ODBC)

加载一批数据之后,客户端应用程序可以获取已处理的行数并确定每个是被接受还是被拒绝。

确定接受的行的数量

若要获取某个批所处理的行数,请将名为 SQL_ATTR_PARAMS_PROCESSED_PTR 且指向用于接收行数的变量的属性添加到语句对象中:

    SQLULEN rowsProcessed;
    SQLSetStmtAttr(hdlStmt, SQL_ATTR_PARAMS_PROCESSED_PTR, &rowsProcessed, 0);

当应用程序调用 SQLExecute() 以插入该批时,Vertica ODBC 驱动程序会将已处理的行数(不一定等于已成功插入的行数)保存到您在 SQL_ATTR_PARAMS_PROCESSED_PTR 语句属性中指定的变量。

查找接受的行和拒绝的行

应用程序还可以设置名为 SQL_ATTR_PARAM_STATUS_PTR 且指向一个数组(ODBC 驱动程序可以将插入每个行的结果存储在此数组中)的语句属性:

    SQLUSMALLINT   rowResults[ NUM_ENTRIES ];
    SQLSetStmtAttr(hdlStmt, SQL_ATTR_PARAM_STATUS_PTR, rowResults, 0);

该数组至少必须与每个批中要插入的行数一样大。

当应用程序调用 SQLExecute 以插入一个批时,ODBC 驱动程序会使用值填充该数组,这些值指示每个行已成功插入(SQL_PARAM_SUCCESS 或 SQL_PARAM_SUCCESS_WITH_INFO)还是遇到了错误 (SQL_PARAM_ERROR)。

以下示例扩展了使用批量插入中所示的示例,以便同时报告已处理的行数和插入的每个行的状态。

// Some standard headers
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
// Helper function to print SQL error messages.
template <typename HandleT>
void reportError(int handleTypeEnum, HandleT hdl)
{
    // Get the status records.
    SQLSMALLINT   i, MsgLen;
    SQLRETURN ret2;
    SQLCHAR       SqlState[6], Msg[SQL_MAX_MESSAGE_LENGTH];
    SQLINTEGER    NativeError;
    i = 1;
    printf("\n");
    while ((ret2 = SQLGetDiagRec(handleTypeEnum, hdl, i, SqlState, &NativeError,
        Msg, sizeof(Msg), &MsgLen)) != SQL_NO_DATA) {
            printf("error record %d\n", i);
            printf("sqlstate: %s\n", SqlState);
            printf("detailed msg: %s\n", Msg);
            printf("native error code: %d\n\n", NativeError);
            i++;
    }
}
int main()
{
    // Number of data rows to insert
    const int NUM_ENTRIES = 4;


    SQLRETURN ret;
    SQLHENV hdlEnv;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not set application version to ODBC3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate database handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated Database handle.\n");
    }
    // Connect to the database
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not connect to database.\n");
        reportError<SQLHDBC>(SQL_HANDLE_DBC, hdlDbc);
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }
    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"DROP TABLE IF EXISTS customers",
        SQL_NTS);
    // Create a table into which we can store data
    printf("Creating table.\n");
    ret = SQLExecDirect(hdlStmt, (SQLCHAR*)"CREATE TABLE customers "
        "(CustID int, CustName varchar(50), Phone_Number char(15));",
        SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
        exit(EXIT_FAILURE);
    } else {
        printf("Created table.\n");
    }
    // Create the prepared statement. This will insert data into the
    // table we created above.
    printf("Creating prepared statement\n");
    ret = SQLPrepare (hdlStmt, (SQLTCHAR*)"INSERT INTO customers (CustID, "
        "CustName,  Phone_Number) VALUES(?,?,?)", SQL_NTS) ;
    if(!SQL_SUCCEEDED(ret)) {
        reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
        exit(EXIT_FAILURE);
    } else {
        printf("Created prepared statement.\n");
    }
    // This is the data to be inserted into the database.
    char custNames[][50] = { "Allen, Anna", "Brown, Bill", "Chu, Cindy",
        "Dodd, Don" };
    SQLINTEGER custIDs[] = { 100, 101, 102, 103};
    char phoneNums[][15] = {"1-617-555-1234", "1-781-555-1212",
        "1-508-555-4321", "1-617-555-4444"};
    // Bind the data arrays to the parameters in the prepared SQL
    // statement
    ret = SQLBindParameter(hdlStmt, 1, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER,
        0, 0, (SQLPOINTER)custIDs, sizeof(SQLINTEGER) , NULL);
    if(!SQL_SUCCEEDED(ret)) {
        reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
        exit(EXIT_FAILURE);
    } else {
        printf("Bound CustIDs array to prepared statement\n");
    }
    // Bind CustNames
    SQLBindParameter(hdlStmt, 2, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_VARCHAR,
        50, 0, (SQLPOINTER)custNames, 50, NULL);
    if(!SQL_SUCCEEDED(ret)) {
        reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
        exit(EXIT_FAILURE);
    } else {
        printf("Bound CustNames array to prepared statement\n");
    }
    // Bind phoneNums
    SQLBindParameter(hdlStmt, 3, SQL_PARAM_INPUT, SQL_C_CHAR, SQL_CHAR,
        15, 0, (SQLPOINTER)phoneNums, 15, NULL);
    if(!SQL_SUCCEEDED(ret)) {
        reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
        exit(EXIT_FAILURE);
    } else {
        printf("Bound phoneNums array to prepared statement\n");
    }
    // Set up a variable to recieve number of parameters processed.
    SQLULEN rowsProcessed;
    // Set a statement attribute to point to the variable
    SQLSetStmtAttr(hdlStmt, SQL_ATTR_PARAMS_PROCESSED_PTR, &rowsProcessed, 0);
    // Set up an array to hold the result of each row insert
    SQLUSMALLINT   rowResults[ NUM_ENTRIES ];
    // Set a statement attribute to point to the array
    SQLSetStmtAttr(hdlStmt, SQL_ATTR_PARAM_STATUS_PTR, rowResults, 0);
    // Tell the ODBC driver how many rows we have in the
    // array.
    SQLSetStmtAttr(hdlStmt, SQL_ATTR_PARAMSET_SIZE, (SQLPOINTER)NUM_ENTRIES, 0);
    // Add multiple batches to the database. This just adds the same
    // batch of data over and over again for simplicity's sake.
    for (int batchLoop=1; batchLoop<=5; batchLoop++) {
        // Execute the prepared statement, loading all of the data
        // in the arrays.
        printf("Adding Batch #%d...", batchLoop);
        ret = SQLExecute(hdlStmt);
        if(!SQL_SUCCEEDED(ret)) {
            reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
            exit(EXIT_FAILURE);
        }
        // Number of rows processed is in rowsProcessed
        printf("Params processed: %d\n", rowsProcessed);
        printf("Results of inserting each row:\n");
        int i;
        for (i = 0; i<NUM_ENTRIES; i++) {
            SQLUSMALLINT result = rowResults[i];
            switch(rowResults[i]) {
                case SQL_PARAM_SUCCESS:
                case SQL_PARAM_SUCCESS_WITH_INFO:
                    printf("  Row %d inserted successsfully\n", i+1);
                    break;
                case SQL_PARAM_ERROR:
                    printf("  Row %d was not inserted due to an error.", i+1);
                    break;
                default:
                    printf("  Row %d had some issue with it: %d\n", i+1, result);
            }
        }
    }
    // Done with batches, commit the transaction
    printf("Commit Transaction\n");
    ret = SQLEndTran(SQL_HANDLE_DBC, hdlDbc, SQL_COMMIT);
    if(!SQL_SUCCEEDED(ret)) {
        reportError<SQLHDBC>( SQL_HANDLE_STMT, hdlStmt );
    }


    // Clean up
    printf("Free handles.\n");
    ret = SQLDisconnect( hdlDbc );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error disconnecting. Transaction still open?\n");
        exit(EXIT_FAILURE);
    }
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

运行以上示例代码后,将生成以下输出:

Allocated an environment handle.Set application to ODBC 3.
Allocated Database handle.
Connecting to database.
Connected to database.
Creating table.
Created table.
Creating prepared statement
Created prepared statement.
Bound CustIDs array to prepared statement
Bound CustNames array to prepared statement
Bound phoneNums array to prepared statement
Adding Batch #1...Params processed: 4
Results of inserting each row:
  Row 1 inserted successfully
  Row 2 inserted successfully
  Row 3 inserted successfully
  Row 4 inserted successfully
Adding Batch #2...Params processed: 4
Results of inserting each row:
  Row 1 inserted successfully
  Row 2 inserted successfully
  Row 3 inserted successfully
  Row 4 inserted successfully
Adding Batch #3...Params processed: 4
Results of inserting each row:
  Row 1 inserted successfully
  Row 2 inserted successfully
  Row 3 inserted successfully
  Row 4 inserted successfully
Adding Batch #4...Params processed: 4
Results of inserting each row:
  Row 1 inserted successfully
  Row 2 inserted successfully
  Row 3 inserted successfully
  Row 4 inserted successfully
Adding Batch #5...Params processed: 4
Results of inserting each row:
  Row 1 inserted successfully
  Row 2 inserted successfully
  Row 3 inserted successfully
  Row 4 inserted successfully
Commit Transaction
Free handles.

13.3.2 - 批量加载过程中的错误处理

加载各个批时,您可以查找有关已接受的行数和已拒绝的行的信息(有关详细信息,请参阅跟踪加载状态 (ODBC))。插入各个批时,不会发生其他错误(例如磁盘空间错误)。此行为是由单个 COPY 语句对多个连续批执行加载导致的。使用单个 COPY 语句可使批量加载过程更快执行。只有 COPY 语句关闭时,才会提交批量数据,而且 Vertica 会报告其他类型错误。

批量加载应用程序应在 COPY 语句关闭时检查错误。一般情况下,可以通过调用 SQLEndTran() 函数以结束事务来强制 COPY 语句关闭。您还可以通过以下方法强制 COPY 语句关闭:使用 SQLCloseCursor() 函数关闭光标,或者在加载中插入最后一批之前将数据库连接的 AutoCommit 属性设置为 true。

13.4 - 使用 COPY 语句

COPY 允许您将存储在数据库节点上的文件中的数据批量加载到 Vertica 数据库。此方法是将数据加载到 Vertica 的最高效方法,因为文件驻留在数据库服务器上。您必须是超级用户才能使用 COPY 访问数据库节点的文件系统。

以下示例演示了如何使用 COPY 命令。

// Some standard headers
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
// Helper function to determine if an ODBC function call returned
// successfully.
bool notSuccess(SQLRETURN ret) {
    return (ret != SQL_SUCCESS && ret != SQL_SUCCESS_WITH_INFO);
}
int main()
{
    // Set up the ODBC environment
    SQLRETURN ret;
    SQLHENV hdlEnv;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(notSuccess(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    // Tell ODBC that the application uses ODBC 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(notSuccess(ret)) {
        printf("Could not set application version to ODBC3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    // Connect to the database
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";

    // Note: User MUST be a database superuser to be able to access files on the
    // filesystem of the node.
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(notSuccess(ret)) {
        printf("Could not connect to database.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }

    // Disable AUTOCOMMIT
    printf("Disabling autocommit.\n");
    ret = SQLSetConnectAttr(hdlDbc, SQL_ATTR_AUTOCOMMIT, SQL_AUTOCOMMIT_OFF, SQL_NTS);
    if(notSuccess(ret)) {
        printf("Could not disable autocommit.\n");
        exit(EXIT_FAILURE);
    }

    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);
    // Create table to hold the data
    SQLExecDirect(hdlStmt, (SQLCHAR*)"DROP TABLE IF EXISTS customers",
        SQL_NTS);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"CREATE TABLE customers"
        "(Last_Name char(50) NOT NULL, First_Name char(50),Email char(50), "
        "Phone_Number char(15));",
        SQL_NTS);

    // Run the copy command to load data.
    ret=SQLExecDirect(hdlStmt, (SQLCHAR*)"COPY customers "
        "FROM '/data/customers.txt'",
        SQL_NTS);
    if(notSuccess(ret)) {
        printf("Data was not successfully loaded.\n");
        exit(EXIT_FAILURE);
    } else {
        // Get number of rows added.
        SQLLEN numRows;
        ret=SQLRowCount(hdlStmt, &numRows);
        printf("Successfully inserted %d rows.\n", numRows);

    }

    // Done with batches, commit the transaction
    printf("Committing transaction\n");
    ret = SQLEndTran(SQL_HANDLE_DBC, hdlDbc, SQL_COMMIT);
    if(notSuccess(ret)) {
        printf("Could not commit transaction\n");
    }  else {
        printf("Committed transaction\n");
    }

    // Clean up
    printf("Free handles.\n");
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

该示例在运行后输出了以下内容:

Allocated an environment handle.
Set application to ODBC 3.
Connecting to database.
Connected to database.
Disabling autocommit.
Successfully inserted 10001 rows.
Committing transaction
Committed transaction
Free handles.

13.5 - 使用 COPY LOCAL 从客户端进行流式数据传输

COPY LOCAL 将数据从客户端系统文件流式传输到 Vertica 数据库。此语句通过 ODBC 驱动程序进行工作,从而简化了将数据文件从客户端传输到服务器的任务。

COPY LOCAL 通过 ODBC 驱动程序以透明方式工作。当客户端应用程序执行 COPY LOCAL 语句时,ODBC 驱动程序将从客户端读取数据文件并将其流式传输到服务器。

此示例演示如何使用 COPY LOCAL 语句从客户端系统加载数据:

// Some standard headers
#include <stdio.h>
#include <stdlib.h>
// Only needed for Windows clients
// #include <windows.h>
// Standard ODBC headers
#include <sql.h>
#include <sqltypes.h>
#include <sqlext.h>
int main()
{
    // Set up the ODBC environment
    SQLRETURN ret;
    SQLHENV hdlEnv;
    ret = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &hdlEnv);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not allocate a handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Allocated an environment handle.\n");
    }
    // Tell ODBC that the application uses ODBC 3.
    ret = SQLSetEnvAttr(hdlEnv, SQL_ATTR_ODBC_VERSION,
        (SQLPOINTER) SQL_OV_ODBC3, SQL_IS_UINTEGER);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not set application version to ODBC3.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    // Allocate a database handle.
    SQLHDBC hdlDbc;
    ret = SQLAllocHandle(SQL_HANDLE_DBC, hdlEnv, &hdlDbc);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not aalocate a database handle.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Set application to ODBC 3.\n");
    }
    // Connect to the database
    printf("Connecting to database.\n");
    const char *dsnName = "ExampleDB";
    const char* userID = "dbadmin";
    const char* passwd = "password123";
    ret = SQLConnect(hdlDbc, (SQLCHAR*)dsnName,
        SQL_NTS,(SQLCHAR*)userID,SQL_NTS,
        (SQLCHAR*)passwd, SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Could not connect to database.\n");
        exit(EXIT_FAILURE);
    } else {
        printf("Connected to database.\n");
    }

    // Set up a statement handle
    SQLHSTMT hdlStmt;
    SQLAllocHandle(SQL_HANDLE_STMT, hdlDbc, &hdlStmt);


    // Create table to hold the data
    SQLExecDirect(hdlStmt, (SQLCHAR*)"DROP TABLE IF EXISTS customers",
        SQL_NTS);
    SQLExecDirect(hdlStmt, (SQLCHAR*)"CREATE TABLE customers"
        "(Last_Name char(50) NOT NULL, First_Name char(50),Email char(50), "
        "Phone_Number char(15));",
        SQL_NTS);

    // Run the copy command to load data.
    ret=SQLExecDirect(hdlStmt, (SQLCHAR*)"COPY customers "
        "FROM LOCAL '/home/dbadmin/customers.txt'",
        SQL_NTS);
    if(!SQL_SUCCEEDED(ret)) {
        printf("Data was not successfully loaded.\n");
        exit(EXIT_FAILURE);
    } else {
        // Get number of rows added.
        SQLLEN numRows;
        ret=SQLRowCount(hdlStmt, &numRows);
        printf("Successfully inserted %d rows.\n", numRows);
    }

    // COPY commits automatically, unless it is told not to, so
    // there is no need to commit the transaction.

    // Clean up
    printf("Free handles.\n");
    ret = SQLDisconnect( hdlDbc );
    if(!SQL_SUCCEEDED(ret)) {
        printf("Error disconnecting. Transaction still open?\n");
        exit(EXIT_FAILURE);
    }
    SQLFreeHandle(SQL_HANDLE_STMT, hdlStmt);
    SQLFreeHandle(SQL_HANDLE_DBC, hdlDbc);
    SQLFreeHandle(SQL_HANDLE_ENV, hdlEnv);
    exit(EXIT_SUCCESS);
}

除了使用 COPY 语句的 LOCAL 选项从客户端系统而非数据库节点的文件系统加载数据之外,此示例与使用 COPY 语句中所示的示例基本相同。