This is the multi-page printable view of this section.
Click here to print.
Return to the regular view of this page.
Querying external tables
After you create an external table, you can query it as you would query any other table.
After you create an external table, you can query it as you would query any other table. Suppose you have created the following external tables:
=> CREATE EXTERNAL TABLE catalog (id INT, description VARCHAR, category VARCHAR)
AS COPY FROM 'hdfs:///dat/catalog.csv' DELIMITER ',';
CREATE TABLE
=> CREATE EXTERNAL TABLE inventory(storeID INT, prodID INT, quantity INT)
AS COPY FROM 'hdfs:///dat/inventory.csv' DELIMITER ',';
CREATE TABLE
You can now write queries against these tables, such as the following:
=> SELECT * FROM catalog;
id | description | category
----+----------------------+-------------
10 | 24in monitor | computers
11 | 27in monitor | computers
12 | 24in IPS monitor | computers
20 | 1TB USB drive | computers
21 | 2TB USB drive | computers
22 | 32GB USB thumb drive | computers
30 | 40in LED TV | electronics
31 | 50in LED TV | electronics
32 | 60in plasma TV | electronics
(9 rows)
=> SELECT * FROM inventory;
storeID | prodID | quantity
---------+--------+----------
502 | 10 | 17
502 | 11 | 2
517 | 10 | 1
517 | 12 | 2
517 | 12 | 4
542 | 10 | 3
542 | 11 | 11
542 | 12 | 1
(8 rows)
=> SELECT inventory.storeID,catalog.description,inventory.quantity
FROM inventory JOIN catalog ON inventory.prodID = catalog.id;
storeID | description | quantity
---------+------------------+----------
502 | 24in monitor | 17
517 | 24in monitor | 1
542 | 24in monitor | 3
502 | 27in monitor | 2
542 | 27in monitor | 11
517 | 24in IPS monitor | 2
517 | 24in IPS monitor | 4
542 | 24in IPS monitor | 1
(8 rows)
One important difference between external tables and Vertica native tables is that querying an external table reads the external data every time. (See How external tables differ from native tables.) Specifically, each time a select query references the external table, Vertica parses the COPY statement definition again to access the data. Certain errors in either your table definition or your data do not become apparent until you run a query, so test your external tables before deploying them in a production environment.
Handling errors
Querying external table data with an incorrect COPY FROM statement definition can potentially result in many rejected rows. To limit the number of rejections, Vertica sets the maximum number of retained rejections with the ExternalTablesExceptionsLimit
configuration parameter. The default value is 100. Setting the ExternalTablesExceptionsLimit
to –1
removes the limit, but is not recommended.
If COPY errors reach the maximum number of rejections, the external table query continues, but COPY generates a warning in the vertica.log
file and does not report subsequent rejected rows.
Using the ExternalTablesExceptionsLimit
configuration parameter differs from using the COPY statement REJECTMAX
parameter to set a low rejection threshold. The REJECTMAX
value controls how many rejected rows to permit before causing the load to fail. If COPY encounters a number of rejected rows equal to or greater than REJECTMAX
, COPY aborts execution instead of logging a warning in vertica.log
.
Queries that include joins perform better if the smaller table is the inner one. For native tables, the query optimizer uses cardinality to choose the inner table. For external tables, the query optimizer uses the row count if available.
After you create an external table, use ANALYZE_EXTERNAL_ROW_COUNT to collect this information. Calling this function is potentially expensive because it has to materialize one column of the table to be able to count the rows, so do this analysis when your database is not busy with critical queries. (This is why Vertica does not perform this operation automatically when you create the table.)
The query optimizer uses the results of your most-recent call to this function when planning queries. If the volume of data changes significantly, therefore, you should run it again to provide updated statistics. A difference of a few percent does not matter, but if your data volume grows by 20% or more, you should repeat this operation when able.
If you have ORC or Parquet data, you can take advantage of optimizations including partition pruning and predicate pushdown. See Improving query performance.
Using external tables with user-defined load (UDL) functions
You can use external tables in conjunction with UDL functions that you create. For more information about using UDLs, see User Defined Load (UDL).
1 - Using partition columns
An ORC or Parquet file contains data columns.
An ORC or Parquet file contains data columns. To these files you can add partition columns at write time. The data files do not store values for partition columns; instead, when writing the files you divide them into groups (partitions) based on column values. You can use partitioning to improve the performance of queries that restrict results by the partitioned column.
For example, if you have a table with a date column, and you know you will be writing queries restricted to particular dates, you can partition by date. Thus, Vertica can skip reading some files entirely when executing your date-restricted queries. This behavior is called partition pruning.
You can create partitions regardless of where you store the files—in HDFS, in an S3 bucket, on a local file system, or in a shared file system such as NFS.
You can use Hive or EXPORT TO PARQUET to create partitions, or you can create them manually. For information about creating partitions as part of exporting data from Vertica, see Partitioning and sorting data. See Improving Query Performance for information about tuning partitions.
Partition structure
By default, both Hive and Vertica write Hadoop columnar format files that contain the data for all table columns without partitioning. The column data is laid out in stripes, or groups of row data. When Vertica loads this data it reads all of the stripes.
If you partition the data, however, you can avoid writing some of that data into the files and thus reduce the amount to be read. Instead of storing a column's data in the files, you create a directory structure that partitions the data based on the value in a column.
For example, if the data includes a date column, you can write each date as a separate partition. Each partition is a directory with a name of the form "column=value". If you have a date column named "created" that is partitioned by day, you would have the following directory structure:
path/created=2016-11-01/*
path/created=2016-11-02/*
path/created=2016-11-03/*
path/...
As this example shows, the files in each subdirectory contain all columns except the "created" column.
You can partition by more than one column, creating a layered structure. For example, adding another partitioned column, "region", to the preceding example would produce the following directory structure:
path/created=2016-11-01/region=northeast/*
path/created=2016-11-01/region=central/*
path/created=2016-11-01/region=southeast/*
path/created=2016-11-01/...
path/created=2016-11-02/region=northeast/*
path/created=2016-11-02/region=central/*
path/created=2016-11-02/region=southeast/*
path/created=2016-11-02/...
path/created=2016-11-03/...
path/...
With this change, the data files contain all columns except "created" and "region".
Note
The files must contain at least one real (not partitioned) column. You cannot partition by every column in a table.
You can create partitions for columns of any simple data type. As a best practice, however, you should avoid partitioning columns with BOOLEAN, FLOAT, and NUMERIC types.
Under some circumstances Hive writes a partition with a value of __HIVE_DEFAULT_PARTITION__. Vertica treats these values as NULL.
COPY syntax
When creating an external table from partitioned data, you must do all of the following:
-
In the column definition in the external table, if you are using strong schema matching (the default), list the partition columns last and in order.
-
In the path, use wildcards to include all of the levels of directories and files.
-
In the ORC or PARQUET statement, specify the partition columns in the hive_partition_cols parameter. (The argument name is the same even if you didn't use Hive to do the partitioning; it refers to Hive-style partitions.) When using strong schema matching, you must list the names in order.
The following example creates an external table using the partitioned data shown previously. The table includes four columns. Two columns, "id" and "name", are in the data files. The other two, "created" and "region", are partitioned.
=> CREATE EXTERNAL TABLE records (id int, name varchar(50), created date, region varchar(50))
AS COPY FROM 'webhdfs:///path/*/*/*'
ORC(hive_partition_cols='created,region');
The path includes one wildcard (*) for each level of directory partitioning and then one more for the files. The number of wildcards must always be one more than the number of partitioned columns.
You do not need to include all of the partitioned columns in hive_partition_cols if those columns are not relevant for your queries. However, the partition columns must be the last columns in the table definition. For example, you can define the following table for the partitioned data shown previously:
=> CREATE EXTERNAL TABLE records (id int, name varchar(50), created date, region varchar(50))
AS COPY FROM 'webhdfs:///path/*/*/*' ORC(hive_partition_cols='region');
Values in the "created" column are all null because no data appears in the files for that column and hive_partition_cols does not include it.
However, the following example produces an error.
=> CREATE EXTERNAL TABLE records (id int, name varchar(50), created date, region varchar(50))
AS COPY FROM 'webhdfs:///path/*/*/*' ORC(hive_partition_cols='created');
In this example, the table definition includes the "region" column after the "created" column, and "region" is not included in hive_partition_cols. Because this column is not listed as a partition column, Vertica interprets it as a data column and produces an error because the column is not present.
If Vertica cannot convert a partition value to the declared type for that column, it sets the value to NULL. The following example incorrectly declares region to be an integer rather than a varchar.
=> CREATE EXTERNAL TABLE records (id int, name varchar(50), created date, region int)
AS COPY FROM 'webhdfs:///path/*/*/*' ORC(hive_partition_cols='region');
Vertica cannot coerce a directory named "region=northeast" into an integer value, so it sets that column value to NULL for all rows it reads from this directory. If you declare the column with IS NOT NULL, Vertica rejects the row. If the number of rows exceeds REJECTMAX, Vertica reports an error.
Note
If you change how files are partitioned on disk, you must re-create your external tables.
Queries
When executing queries with predicates, Vertica skips subdirectories that do not satisfy the predicate. This process is called partition pruning and it can significantly improve query performance. See Improving query performance for more information about partition pruning and other techniques for optimizing queries.
The following example reads only the partitions for the specified region, for all dates. Although the data is also partitioned by date, the query does not restrict the date.
=> SELECT * FROM t WHERE region='northeast';
To verify that Vertica is pruning partitions, look in the explain plan for a message similar to the following:
files with unmatched Hive partition have been pruned
2 - Improving query performance
When working with external tables in the Parquet and ORC columnar formats, Vertica tries to improve performance in the following ways:.
When working with external tables in the Parquet and ORC columnar formats, Vertica tries to improve performance in the following ways:
-
By pushing query execution closer to the data so less has to be read and transmitted. Vertica uses the following specific techniques: predicate pushdown, column selection, and partition pruning.
-
By taking advantage of data locality in the query plan.
-
By analyzing the row count to get the best join orders in the query plan.
The following figure illustrates optimizations that can reduce the amount of data to be read:
Tuning ORC stripes and Parquet rowgroups
Vertica can read ORC and Parquet files generated by any Hive version. However, newer Hive versions store more metadata in these files. This metadata is used by both Hive and Vertica to prune values and to read only the required data. Use the latest Hive version to store data in these formats. ORC and Parquet are fully forward- and backward-compatible. To get the best performance, use Hive 0.14 or later.
The ORC format splits a table into groups of rows called stripes and stores column-level metadata in each stripe. The Parquet format splits a table into groups of rows called rowgroups and stores column-level metadata in each rowgroup. Each stripe/rowgroup's metadata is used during predicate evaluation to determine whether the values from this stripe need to be read. Large stripes usually yield better performance, so set the stripe size to at least 256M.
Hive writes ORC stripes and Parquet rowgroups to HDFS, which stores data in HDFS blocks distributed among multiple physical data nodes. Accessing an HDFS block requires opening a separate connection to the corresponding data node. It is advantageous to ensure that an ORC stripe or Parquet rowgroup does not span more than one HDFS block. To do so, set the HDFS block size to be larger than the stripe/rowgroup size. Setting HDFS block size to 512M is usually sufficient.
Hive provides three compression options: None, Snappy, and Zlib. Use Snappy or Zlib compression to reduce storage and I/O consumption. Usually, Snappy is less CPU-intensive but can yield lower compression ratios compared to Zlib.
Storing data in sorted order can improve data access and predicate evaluation performance. Sort table columns based on the likelihood of their occurrence in query predicates; columns that most frequently occur in comparison or range predicates should be sorted first.
Partitioning tables is a very useful technique for data organization. Similarly to sorting tables by columns, partitioning can improve data access and predicate evaluation performance. Vertica supports Hive-style partitions and partition pruning.
The following Hive statement creates an ORC table with stripe size 256M and Zlib compression:
hive> CREATE TABLE customer_visits (
customer_id bigint,
visit_num int,
page_view_dt date)
STORED AS ORC tblproperties("orc.compress"="ZLIB",
"orc.stripe.size"="268435456");
The following statement creates a Parquet table with stripe size 256M and Zlib compression:
hive> CREATE TABLE customer_visits (
customer_id bigint,
visit_num int,
page_view_dt date)
STORED AS PARQUET tblproperties("parquet.compression"="ZLIB",
"parquet.stripe.size"="268435456");
Predicate pushdown and column selection
Predicate pushdown moves parts of the query execution closer to the data, reducing the amount of data that must be read from disk or across the network. ORC files have three levels of indexing: file statistics, stripe statistics, and row group indexes. Predicates are applied only to the first two levels. Parquet files have two levels of statistics: rowgroup statistics and page statistics. Predicates are only applied to the first level.
Predicate pushdown is automatically applied for files written with Hive version 0.14 and later. ORC files written with earlier versions of Hive might not contain the required statistics. When executing a query against a file that lacks these statistics, Vertica logs an EXTERNAL_PREDICATE_PUSHDOWN_NOT_SUPPORTED event in the QUERY_EVENTS system table. If you are seeing performance problems with your queries, check this table for these events.
Another query performance optimization technique used by Vertica is column selection. Vertica reads from ORC or Parquet files only the columns specified in the query statement. For example, the following statement reads only the customer_id and visit_num columns from the corresponding ORC files:
=> CREATE EXTERNAL TABLE customer_visits (
customer_id bigint,
visit_num int,
page_view_dt date)
AS COPY FROM '...' ORC;
=> SELECT customer_id from customer_visits
WHERE visit_num > 10;
Data locality
In a cluster where Vertica nodes are co-located on HDFS nodes, the query can use data locality to improve performance. For Vertica to do so, both the following conditions must exist::
When both these conditions exist, the query planner uses the co-located database node to read that data locally, instead of making a network call.
You can see how much data is being read locally by inspecting the query plan. The label for LoadStep(s) in the plan contains a statement of the form: "X% of ORC/Parquet data matched with co-located Vertica nodes". To increase the volume of local reads, consider adding more database nodes. HDFS data, by its nature, can't be moved to specific nodes, but if you run more database nodes you increase the likelihood that a database node is local to one of the copies of the data.
Creating sorted files in Hive
Unlike Vertica, Hive does not store table columns in separate files and does not create multiple projections per table with different sort orders. For efficient data access and predicate pushdown, sort Hive table columns based on the likelihood of their occurrence in query predicates. Columns that most frequently occur in comparison or range predicates should be sorted first.
Data can be inserted into Hive tables in a sorted order by using the ORDER BY or SORT BY keywords. For example, to insert data into the ORC table "customer_visit" from another table "visits" with the same columns, use these keywords with the INSERT INTO command:
hive> INSERT INTO TABLE customer_visits
SELECT * from visits
ORDER BY page_view_dt;
hive> INSERT INTO TABLE customer_visits
SELECT * from visits
SORT BY page_view_dt;
The difference between the two keywords is that ORDER BY guarantees global ordering on the entire table by using a single MapReduce reducer to populate the table. SORT BY uses multiple reducers, which can cause ORC or Parquet files to be sorted by the specified column(s) but not be globally sorted. Using the latter keyword can increase the time taken to load the file.
You can combine clustering and sorting to sort a table globally. The following table definition adds a hint that data is inserted into this table bucketed by customer_id and sorted by page_view_dt:
hive> CREATE TABLE customer_visits_bucketed (
customer_id bigint,
visit_num int,
page_view_dt date)
CLUSTERED BY (page_view_dt)
SORTED BY (page_view_dt)INTO 10 BUCKETS
STORED AS ORC;
When inserting data into the table, you must explicitly specify the clustering and sort columns, as in the following example:
hive> INSERT INTO TABLE customer_visits_bucketed
SELECT * from visits
DISTRIBUTE BY page_view_dt
SORT BY page_view_dt;
The following statement is equivalent:
hive> INSERT INTO TABLE customer_visits_bucketed
SELECT * from visits
CLUSTER BY page_view_dt;
Both of the above commands insert data into the customer_visits_bucketed table, globally sorted on the page_view_dt column.
Partitioning Hive tables
Table partitioning in Hive is an effective technique for data separation and organization, as well as for reducing storage requirements. To partition a table in Hive, include it in the PARTITIONED BY clause:
hive> CREATE TABLE customer_visits (
customer_id bigint,
visit_num int)
PARTITIONED BY (page_view_dt date)
STORED AS ORC;
Hive does not materialize partition column(s). Instead, it creates subdirectories of the following form:
path_to_table/partition_column_name=value/
When the table is queried, Hive parses the subdirectories' names to materialize the values in the partition columns. The value materialization in Hive is a plain conversion from a string to the appropriate data type.
Inserting data into a partitioned table requires specifying the value(s) of the partition column(s). The following example creates two partition subdirectories, "customer_visits/page_view_dt=2016-02-01" and "customer_visits/page_view_dt=2016-02-02":
hive> INSERT INTO TABLE customer_visits
PARTITION (page_view_dt='2016-02-01')
SELECT customer_id, visit_num from visits
WHERE page_view_dt='2016-02-01'
ORDER BY page_view_dt;
hive> INSERT INTO TABLE customer_visits
PARTITION (page_view_dt='2016-02-02')
SELECT customer_id, visit_num from visits
WHERE page_view_dt='2016-02-02'
ORDER BY page_view_dt;
Each directory contains ORC files with two columns, customer_id and visit_num.
Accessing partitioned data from Vertica
Vertica recognizes and supports Hive-style partitions. You can read partition values and data using the HCatalog Connector or the COPY statement.
If you use the HCatalog Connector, you must create an HCatalog schema in Vertica that mirrors a schema in Hive:
=> CREATE EXTERNAL TABLE customer_visits (customer_id int, visit_num int,
page_view_dtm date)
AS COPY FROM 'hdfs://host:port/path/customer_visits/*/*' ORC
(hive_partition_cols='page_view_dtm');
The following statement reads all ORC files stored in all sub-directories including the partition values:
=> SELECT customer_id, visit_num, page_view FROM customer_visits;
When executing queries with predicates on partition columns, Vertica uses the subdirectory names to skip files that do not satisfy the predicate. This process is called partition pruning.
You can also define a separate external table for each subdirectory, as in the following example:
=> CREATE EXTERNAL TABLE customer_visits_20160201 (customer_id int,
visit_num int, page_view_dtm date)
AS COPY FROM
'hdfs://host:port/path/customer_visits/page_view_dt=2016-02-01/*' ORC;
Example: a partitioned, sorted ORC table
Suppose you have data stored in CSV files containing three columns: customer_id, visit_num, page_view_dtm:
1,123,2016-01-01
33,1,2016-02-01
2,57,2016-01-03
...
The goal is to create the following Hive table:
hive> CREATE TABLE customer_visits (
customer_id bigint,
visit_num int)
PARTITIONED BY (page_view_dt date)
STORED AS ORC;
To achieve this, perform the following steps:
-
Copy or move the CSV files to HDFS.
-
Define a textfile Hive table and copy the CSV files into it:
hive> CREATE TABLE visits (
customer_id bigint,
visit_num int,
page_view_dt date)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
hive> LOAD DATA INPATH path_to_csv_files INTO TABLE visits;
-
For each unique value in page_view_dt, insert the data into the target table while materializing page_view_dt as page_view_dtm:
hive> INSERT INTO TABLE customer_visits
PARTITION (page_view_dt='2016-01-01')
SELECT customer_id, visit_num FROM visits
WHERE page_view_dt='2016-01-01'
ORDER BY page_view_dt;
...
This operation inserts data from visits.customer_id into customer_visits.customer_id, and from visits.visit_num into customer_visits.visit_num. These two columns are stored in generated ORC files. Simultaneously, values from visits.page_view_dt are used to create partitions for the partition column customer_visits.page_view_dt, which is not stored in the ORC files.
Data modification in Hive
Hive is well-suited for reading large amounts of write-once data. Its optimal usage is loading data in bulk into tables and never modifying the data. In particular, for data stored in the ORC and Parquet formats, this usage pattern produces large, globally (or nearly globally) sorted files.
Periodic addition of data to tables (known as “trickle load”) is likely to produce many small files. The disadvantage of this is that Vertica has to access many more files during query planning and execution. These extra access can result in longer query-processing time. The major performance degradation comes from the increase in the number of file seeks on HDFS.
Hive can also modify underlying ORC or Parquet files without user involvement. If enough records in a Hive table are modified or deleted, for example, Hive deletes existing files and replaces them with newly-created ones. Hive can also be configured to automatically merge many small files into a few larger files.
When new tables are created, or existing tables are modified in Hive, you must manually synchronize Vertica to keep it up to date. The following statement synchronizes the Vertica schema "hcat" after a change in Hive:
=> SELECT sync_with_hcatalog_schema('hcat_local', 'hcat');
Schema evolution in Hive
Hive supports two kinds of schema evolution:
- New columns can be added to existing tables in Hive. Vertica automatically handles this kind of schema evolution. The old records display NULLs for the newer columns.
- The type of a column for a table can be modified in Hive. Vertica does not support this kind of schema evolution.
The following example demonstrates schema evolution through new columns. In this example, hcat.parquet.txt is a file with the following values:
-1|0.65|0.65|6|'b'
hive> create table hcat.parquet_tmp (a int, b float, c double, d int, e varchar(4))
row format delimited fields terminated by '|' lines terminated by '\n';
hive> load data local inpath 'hcat.parquet.txt' overwrite into table
hcat.parquet_tmp;
hive> create table hcat.parquet_evolve (a int) partitioned by (f int) stored as
parquet;
hive> insert into table hcat.parquet_evolve partition (f=1) select a from
hcat.parquet_tmp;
hive> alter table hcat.parquet_evolve add columns (b float);
hive> insert into table hcat.parquet_evolve partition (f=2) select a, b from
hcat.parquet_tmp;
hive> alter table hcat.parquet_evolve add columns (c double);
hive> insert into table hcat.parquet_evolve partition (f=3) select a, b, c from
hcat.parquet_tmp;
hive> alter table hcat.parquet_evolve add columns (d int);
hive> insert into table hcat.parquet_evolve partition (f=4) select a, b, c, d from
hcat.parquet_tmp;
hive> alter table hcat.parquet_evolve add columns (e varchar(4));
hive> insert into table hcat.parquet_evolve partition (f=5) select a, b, c, d, e
from hcat.parquet_tmp;
hive> insert into table hcat.parquet_evolve partition (f=6) select a, b, c, d, e
from hcat.parquet_tmp;
=> SELECT * from hcat_local.parquet_evolve;
a | b | c | d | e | f
----+-------------------+------+---+---+---
-1 | | | | | 1
-1 | 0.649999976158142 | | | | 2
-1 | 0.649999976158142 | 0.65 | | | 3
-1 | 0.649999976158142 | 0.65 | 6 | | 4
-1 | 0.649999976158142 | 0.65 | 6 | b | 5
-1 | 0.649999976158142 | 0.65 | 6 | b | 6
(6 rows)