适用于 Vertica 集成的 Hive 入门知识

您可以使用 Hive 导出数据以供 Vertica 使用。您在执行 Hive 导出时所做的决定会影响 Vertica 的性能。

调整 ORC 带区和 Parquet 行组

Vertica 可以读取任何 Hive 版本生成的 ORC 和 Parquet 文件。不过,较新的 Hive 版本在这些文件中存储了更多元数据。Hive 和 Vertica 都使用这些元数据来修剪值并仅读取所需的数据。使用最新的 Hive 版本以采用这些格式存储数据。ORC 和 Parquet 完全向前和向后兼容。要获得最佳性能,请使用 Hive 0.14 或更高版本。

ORC 格式将表拆分为称作“带区”的行分组,并在每个带区中存储列级元数据。Parquet 格式将表拆分为称作“行组”的行分组,并在每个行组中存储列级元数据。在谓词评估期间使用每个带区/行组的元数据来确定是否需要读取该带区中的值。大的带区通常会产生较好的性能,因此请将带区大小设置为至少 256M。

Hive 将 ORC 带区和 Parquet 行组写入 HDFS,HDFS 将数据存储在分布于多个物理数据节点的 HDFS 块中。访问 HDFS 块需要打开与相应数据节点的单独连接。最好确保 ORC 带区或 Parquet 行组不跨越多个 HDFS 块。为此,请将 HDFS 块大小设置为大于带区/行组大小。通常,将 HDFS 块大小设置为 512M 就足够了。

Hive 提供三种压缩选项:无、Snappy 和 Zlib。使用 Snappy 或 Zlib 压缩可减少存储和 I/O 消耗。通常,Snappy 的 CPU 密集度较低,但与 Zlib 相比可以产生更低的压缩率。

按排序顺序存储数据可以提高数据访问速度和谓词评估性能。根据表列在查询谓词中出现的可能性对其进行排序;应先对在比较或范围谓词中最常出现的列进行排序。

分区表是一种非常有用的数据组织技术。类似于按列对表进行排序,分区可以提高数据访问和谓词评估性能。Vertica 支持 Hive 样式的分区和分区修剪。

以下 Hive 语句创建带区大小为 256M 且采用 Zlib 压缩的 ORC 表:

hive> CREATE TABLE customer_visits (
        customer_id bigint,
        visit_num int,
        page_view_dt date)
    STORED AS ORC tblproperties("orc.compress"="ZLIB",
        "orc.stripe.size"="268435456");

以下语句创建带区大小为 256M 且采用 Zlib 压缩的 Parquet 表:

hive> CREATE TABLE customer_visits (
        customer_id bigint,
        visit_num int,
        page_view_dt date)
    STORED AS PARQUET tblproperties("parquet.compression"="ZLIB",
        "parquet.stripe.size"="268435456");

在 Hive 中创建排序文件

与 Vertica 不同,Hive 不会将表列存储在单独的文件中,并且不会为每个表创建具有不同排序顺序的多个投影。为了实现高效的数据访问和谓词下推,根据 Hive 表列在查询谓词中出现的可能性对它们进行排序。最常出现在比较或范围谓词中的列应排在前面。

可以使用 ORDER BY 或 SORT BY 关键字将数据按排序顺序插入 Hive 表。例如,要在 ORC 表 "customer_visit" 中插入具有相同列的另一个表 "visits" 中的数据,请在 INSERT INTO 命令中使用以下关键字:

hive> INSERT INTO TABLE customer_visits
        SELECT * from visits
        ORDER BY page_view_dt;
hive> INSERT INTO TABLE customer_visits
        SELECT * from visits
        SORT BY page_view_dt;

这两个关键字之间的区别在于 ORDER BY 通过使用单个 MapReduce reducer 填充表来保证整个表的全局排序。SORT BY 使用多个 reducer,这可能导致 ORC 或 Parquet 文件按指定列排序,而不能全局排序。使用靠后的关键字会增加加载文件所需的时间。

您可以结合聚类和排序来对表进行全局排序。以下表定义添加了一个提示,即数据插入到此表中,按 customer_id 分桶并按 page_view_dt 排序:

hive> CREATE TABLE customer_visits_bucketed (
        customer_id bigint,
        visit_num int,
        page_view_dt date)
    CLUSTERED BY (page_view_dt)
    SORTED BY (page_view_dt)INTO 10 BUCKETS
    STORED AS ORC;

将数据插入到表中时,必须显式指定聚类和排序列,如下例所示:

hive> INSERT INTO TABLE customer_visits_bucketed
    SELECT * from visits
    DISTRIBUTE BY page_view_dt
    SORT BY page_view_dt;

下面的语句是等效的:

hive> INSERT INTO TABLE customer_visits_bucketed
    SELECT * from visits
    CLUSTER BY page_view_dt;

上述两个命令都将数据插入到 customer_visits_bucketed 表中,并在 page_view_dt 列上全局排序。

对 Hive 表进行分区

Hive 中的表分区是一种用于数据分离和组织以及减少存储需求的有效技术。要在 Hive 中对表进行分区,请将其包含在 PARTITIONED BY 子句中:

hive> CREATE TABLE customer_visits (
        customer_id bigint,
        visit_num int)
    PARTITIONED BY (page_view_dt date)
    STORED AS ORC;

Hive 不实体化分区列。相反,它会创建以下形式的子目录:

path_to_table/partition_column_name=value/

查询表时,Hive 解析子目录名称以实体化分区列中的值。Hive 中的值实体化是从字符串到适当数据类型的简单转换。

将数据插入分区表需要指定分区列的值。以下示例创建两个分区子目录 "customer_visits/page_view_dt=2016-02-01" 和 "customer_visits/page_view_dt=2016-02-02":

hive> INSERT INTO TABLE customer_visits
    PARTITION (page_view_dt='2016-02-01')
    SELECT customer_id, visit_num from visits
    WHERE page_view_dt='2016-02-01'
    ORDER BY page_view_dt;

hive> INSERT INTO TABLE customer_visits
    PARTITION (page_view_dt='2016-02-02')
    SELECT customer_id, visit_num from visits
    WHERE page_view_dt='2016-02-02'
    ORDER BY page_view_dt;

每个目录都包含 ORC 文件,其中包含两列 customer_id 和 visit_num。

示例:已分区且排序的 ORC 表

假设您将数据存储在包含 customer_id、visit_num、page_view_dtm 这三列的 CSV 文件中:

1,123,2016-01-01
33,1,2016-02-01
2,57,2016-01-03
...

目标是创建以下 Hive 表:

hive> CREATE TABLE customer_visits (
            customer_id bigint,
            visit_num int)
    PARTITIONED BY (page_view_dt date)
    STORED AS ORC;

为此,请执行以下步骤:

  1. 将 CSV 文件复制或移动到 HDFS。

  2. 定义一个文本文件 Hive 表并将 CSV 文件复制到其中:

    hive> CREATE TABLE visits (
                customer_id bigint,
                visit_num int,
                page_view_dt date)
        ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
        STORED AS TEXTFILE;
    
    hive> LOAD DATA INPATH path_to_csv_files INTO TABLE visits;
    
  3. 对于 page_view_dt 中的每个唯一值,将数据插入目标表,同时将 page_view_dt 实体化为 page_view_dtm:

    hive> INSERT INTO TABLE customer_visits
            PARTITION (page_view_dt='2016-01-01')
            SELECT customer_id, visit_num FROM visits
            WHERE page_view_dt='2016-01-01'
            ORDER BY page_view_dt;
    ...
    

    此操作将 visits.customer_id 中的数据插入 customer_visits.customer_id,并将 visits.visit_num 中的数据插入 customer_visits.visit_num。这两列存储在生成的 ORC 文件中。同时,visits.page_view_dt 中的值用于为分区列 customer_visits.page_view_dt(该列不存储在 ORC 文件中)创建分区。

Hive 中的数据修改

Hive 非常适合读取大量的一次性写入数据。它的最佳用法是将数据批量加载到表中,并且从不修改数据。特别是,对于以 ORC 和 Parquet 格式存储的数据,这种使用模式会生成大型的全局(或几乎全局)排序文件。

定期向表中添加数据(称为“涓流式加载”)可能会产生许多小文件。这样做的缺点是 Vertica 在查询计划和执行期间必须访问更多文件。这些额外的访问可能会导致查询处理时间延长。主要的性能下降是因为 HDFS 上文件搜索次数的增加。

Hive 还可以在无需用户干预的情况下修改底层 ORC 或 Parquet 文件。例如,如果 Hive 表中有足够多的记录被修改或删除,Hive 会删除现有文件并用新创建的文件替换它们。Hive 还可以配置为自动将许多小文件合并为几个较大的文件。

在 Hive 中创建新表或修改现有表时,必须手动同步 Vertica 以使其保持最新。以下语句在 Hive 发生更改后同步 Vertica 架构 "hcat":

=> SELECT sync_with_hcatalog_schema('hcat_local', 'hcat');

Hive 中的架构演变

Hive 支持两种架构演变:

  1. 可以将新列添加到 Hive 中的现有表。Vertica 会自动处理这种架构演变。对于新列,旧记录将显示 NULL。
  2. 可以在 Hive 中修改表的列类型。Vertica 不支持这种架构演变。

下面的示例演示了通过新列进行的架构演变。在此示例中,hcat.parquet.txt 是具有以下值的文件:

-1|0.65|0.65|6|'b'
hive> create table hcat.parquet_tmp (a int, b float, c double, d int, e varchar(4))
      row format delimited fields terminated by '|' lines terminated by '\n';

hive> load data local inpath 'hcat.parquet.txt' overwrite into table
      hcat.parquet_tmp;

hive> create table hcat.parquet_evolve (a int) partitioned by (f int) stored as
      parquet;
hive> insert into table hcat.parquet_evolve partition (f=1) select a from
      hcat.parquet_tmp;
hive> alter table hcat.parquet_evolve add columns (b float);
hive> insert into table hcat.parquet_evolve partition (f=2) select a, b from
      hcat.parquet_tmp;
hive> alter table hcat.parquet_evolve add columns (c double);
hive> insert into table hcat.parquet_evolve partition (f=3) select a, b, c from
      hcat.parquet_tmp;
hive> alter table hcat.parquet_evolve add columns (d int);
hive> insert into table hcat.parquet_evolve partition (f=4) select a, b, c, d from
      hcat.parquet_tmp;
hive> alter table hcat.parquet_evolve add columns (e varchar(4));
hive> insert into table hcat.parquet_evolve partition (f=5) select a, b, c, d, e
      from hcat.parquet_tmp;
hive> insert into table hcat.parquet_evolve partition (f=6) select a, b, c, d, e
      from hcat.parquet_tmp;

=> SELECT * from hcat_local.parquet_evolve;

  a |         b         |  c   | d | e | f
----+-------------------+------+---+---+---
 -1 |                   |      |   |   | 1
 -1 | 0.649999976158142 |      |   |   | 2
 -1 | 0.649999976158142 | 0.65 |   |   | 3
 -1 | 0.649999976158142 | 0.65 | 6 |   | 4
 -1 | 0.649999976158142 | 0.65 | 6 | b | 5
 -1 | 0.649999976158142 | 0.65 | 6 | b | 6
(6 rows)