存储过程:用例和示例

Vertica 中的存储过程最适合复杂的分析工作流,而不适合小型、事务繁重的工作流。一些推荐的用例包括信息生命周期管理 (ILM) 活动(例如提取、转换和加载 (ETL))以及更复杂的分析任务(如机器学习)的数据准备。例如:

  • 根据生命期交换分区

  • 在生命期结束时导出数据并删除分区

  • 保存机器学习模型的输入、输出和元数据(例如运行模型的人员、模型的版本、模型运行次数以及接收结果的人员等)以进行审核

搜索值

find_my_value() 过程在给定架构的任何表列中搜索用户指定的值,并将该值实例的位置存储在用户指定的表中:

=> CREATE PROCEDURE find_my_value(p_table_schema VARCHAR(128), p_search_value VARCHAR(1000), p_results_schema VARCHAR(128), p_results_table VARCHAR(128)) AS $$
DECLARE
    sql_cmd VARCHAR(65000);
    sql_cmd_result VARCHAR(65000);
    results VARCHAR(65000);
BEGIN
    IF p_table_schema IS NULL OR p_table_schema = '' OR
        p_search_value IS NULL OR p_search_value = '' OR
        p_results_schema IS NULL OR p_results_schema = '' OR
        p_results_table IS NULL OR p_results_table = '' THEN
        RAISE EXCEPTION 'Please provide a schema to search, a search value, a results table schema, and a results table name.';
        RETURN;
    END IF;

    sql_cmd := 'CREATE TABLE IF NOT EXISTS ' || QUOTE_IDENT(p_results_schema) || '.' || QUOTE_IDENT(p_results_table) ||
        '(found_timestamp TIMESTAMP, found_value VARCHAR(1000), table_name VARCHAR(128), column_name VARCHAR(128));';

    sql_cmd_result := EXECUTE 'SELECT LISTAGG(c USING PARAMETERS max_length=1000000, separator='' '')
        FROM (SELECT ''
        (SELECT '''''' || NOW() || ''''''::TIMESTAMP , ''''' || QUOTE_IDENT(p_search_value) || ''''','''''' || table_name || '''''', '''''' || column_name || ''''''
            FROM '' || table_schema || ''.'' || table_name || ''
            WHERE '' || column_name || ''::'' ||
            CASE
                WHEN data_type_id IN (17, 115, 116, 117) THEN data_type
                ELSE ''VARCHAR('' || LENGTH(''' || QUOTE_IDENT(p_search_value)|| ''') || '')'' END || '' = ''''' || QUOTE_IDENT(p_search_value) || ''''''' || DECODE(LEAD(column_name) OVER(ORDER BY table_schema, table_name, ordinal_position), NULL, '' LIMIT 1);'', '' LIMIT 1)

        UNION ALL '') c
            FROM (SELECT table_schema, table_name, column_name, ordinal_position, data_type_id, data_type
            FROM columns WHERE NOT is_system_table AND table_schema ILIKE ''' || QUOTE_IDENT(p_table_schema) || ''' AND data_type_id < 1000
            ORDER BY table_schema, table_name, ordinal_position) foo) foo;';

    results := EXECUTE 'INSERT INTO ' || QUOTE_IDENT(p_results_schema) || '.' || QUOTE_IDENT(p_results_table) || ' ' || sql_cmd_result;

    RAISE INFO 'Matches Found: %', results;
END;
$$;

例如,在 public 架构中搜索字符串 'dog' 的实例,然后将结果存储在 public.table_list 中:

=> CALL find_my_value('public', 'dog', 'public', 'table_list');
 find_my_value
---------------
             0
(1 row)

=> SELECT * FROM public.table_list;
      found_timestamp       | found_value |  table_name   | column_name
----------------------------+-------------+---------------+-------------
 2021-08-25 22:13:20.147889 | dog         | another_table | b
 2021-08-25 22:13:20.147889 | dog         | some_table    | c
(2 rows)

优化表

您可以使用 create_optimized_table() 过程自动从 Parquet 文件加载数据并优化查询。此过程:

  1. 创建一个外部表,其结构是使用 Vertica INFER_TABLE_DDL 函数从 Parquet 文件构建的。

  2. 创建一个原生 Vertica 表,就像外部表一样,将所有 VARCHAR 列的大小调整为要加载的数据的 MAX 长度。

  3. 使用可选分段/按作为参数传入的列排序创建超投影。

  4. 向作为参数传入的原生表添加可选主键。

  5. 将外部表中的示例数据集(100 万行)加载到原生表中。

  6. 删除外部表。

  7. 在原生表上运行 ANALYZE_STATISTICS 函数。

  8. 运行 DESIGNER_DESIGN_PROJECTION_ENCODINGS 函数以获取原生表的正确编码的超投影。

  9. 截断现在优化的原生表(我们将在单独的脚本/存储过程中加载整个数据集)。


=> CREATE OR REPLACE PROCEDURE create_optimized_table(p_file_path VARCHAR(1000), p_table_schema VARCHAR(128), p_table_name VARCHAR(128), p_seg_columns VARCHAR(1000), p_pk_columns VARCHAR(1000)) LANGUAGE PLvSQL AS $$
DECLARE
    command_sql VARCHAR(1000);
    seg_columns VARCHAR(1000);
    BEGIN

-- First 3 parms are required.
-- Segmented and PK columns names, if present, must be Unquoted Identifiers
    IF p_file_path IS NULL OR p_file_path = '' THEN
        RAISE EXCEPTION 'Please provide a file path.';
    ELSEIF p_table_schema IS NULL OR p_table_schema = '' THEN
        RAISE EXCEPTION 'Please provide a table schema.';
    ELSEIF p_table_name IS NULL OR p_table_name = '' THEN
        RAISE EXCEPTION 'Please provide a table name.';
    END IF;

-- Pass optional segmented columns parameter as null or empty string if not used
    IF p_seg_columns IS NULL OR p_seg_columns = '' THEN
        seg_columns := '';
    ELSE
        seg_columns := 'ORDER BY ' || p_seg_columns || ' SEGMENTED BY HASH(' || p_seg_columns || ') ALL NODES';
    END IF;

-- Add '_external' to end of p_table_name for the external table and drop it if it already exists
    EXECUTE 'DROP TABLE IF EXISTS ' || QUOTE_IDENT(p_table_schema) || '.' || QUOTE_IDENT(p_table_name) || '_external CASCADE;';

-- Execute INFER_TABLE_DDL to generate CREATE EXTERNAL TABLE from the Parquet files
    command_sql := EXECUTE 'SELECT infer_table_ddl(' || QUOTE_LITERAL(p_file_path) || ' USING PARAMETERS format = ''parquet'', table_schema = ''' || QUOTE_IDENT(p_table_schema) || ''', table_name = ''' || QUOTE_IDENT(p_table_name) || '_external'', table_type = ''external'');';

-- Run the CREATE EXTERNAL TABLE DDL
    EXECUTE command_sql;

-- Generate the Internal/ROS Table DDL and generate column lengths based on maximum column lengths found in external table
    command_sql := EXECUTE 'SELECT LISTAGG(y USING PARAMETERS separator='' '')
        FROM ((SELECT 0 x, ''SELECT ''''CREATE TABLE ' || QUOTE_IDENT(p_table_schema) || '.' || QUOTE_IDENT(p_table_name) || '('' y
            UNION ALL SELECT ordinal_position, column_name || '' '' ||
            CASE WHEN data_type LIKE ''varchar%''
                THEN ''varchar('''' || (SELECT MAX(LENGTH('' || column_name || ''))
                    FROM '' || table_schema || ''.'' || table_name || '') || '''')'' ELSE data_type END || NVL2(LEAD('' || column_name || '', 1) OVER (ORDER BY ordinal_position), '','', '')'')
                    FROM columns WHERE table_schema = ''' || QUOTE_IDENT(p_table_schema) || ''' AND table_name = ''' || QUOTE_IDENT(p_table_name) || '_external''
                    UNION ALL SELECT 10000, ''' || seg_columns || ''' UNION ALL SELECT 10001, '';'''''') ORDER BY x) foo WHERE y <> '''';';
    command_sql := EXECUTE command_sql;
    EXECUTE command_sql;

-- Alter the Internal/ROS Table if primary key columns were passed as a parameter
    IF p_pk_columns IS NOT NULL AND p_pk_columns <> '' THEN
        EXECUTE 'ALTER TABLE ' || QUOTE_IDENT(p_table_schema) || '.' || QUOTE_IDENT(p_table_name) || ' ADD CONSTRAINT ' || QUOTE_IDENT(p_table_name) || '_pk PRIMARY KEY (' || p_pk_columns || ') ENABLED;';
    END IF;

-- Insert 1M rows into the Internal/ROS Table, analyze stats, and generate encodings
    EXECUTE 'INSERT INTO ' || QUOTE_IDENT(p_table_schema) || '.' || QUOTE_IDENT(p_table_name) || ' SELECT * FROM ' || QUOTE_IDENT(p_table_schema) || '.' || QUOTE_IDENT(p_table_name) || '_external LIMIT 1000000;';

    EXECUTE 'SELECT analyze_statistics(''' || QUOTE_IDENT(p_table_schema) || '.' || QUOTE_IDENT(p_table_name) || ''');';

    EXECUTE 'SELECT designer_design_projection_encodings(''' || QUOTE_IDENT(p_table_schema) || '.' || QUOTE_IDENT(p_table_name) || ''', ''/tmp/toss.sql'', TRUE, TRUE);';


-- Truncate the Internal/ROS Table and you are now ready to load all rows
-- Drop the external table

    EXECUTE 'TRUNCATE TABLE ' || QUOTE_IDENT(p_table_schema) || '.' || QUOTE_IDENT(p_table_name) || ';';

    EXECUTE 'DROP TABLE IF EXISTS ' || QUOTE_IDENT(p_table_schema) || '.' || QUOTE_IDENT(p_table_name) || '_external CASCADE;';

  END;

  $$;
=> call create_optimized_table('/home/dbadmin/parquet_example/*','public','parquet_table','c1,c2','c1');

create_optimized_table
------------------------
                      0
(1 row)

=> select export_objects('', 'public.parquet_table');
       export_objects
------------------------------------------
CREATE TABLE public.parquet_table
(
    c1 int NOT NULL,
    c2 varchar(36),
    c3 date,
    CONSTRAINT parquet_table_pk PRIMARY KEY (c1) ENABLED
);


CREATE PROJECTION public.parquet_table_super /*+createtype(D)*/
(
c1 ENCODING COMMONDELTA_COMP,
c2 ENCODING ZSTD_FAST_COMP,
c3 ENCODING COMMONDELTA_COMP
)
AS
SELECT parquet_table.c1,
        parquet_table.c2,
        parquet_table.c3
FROM public.parquet_table
ORDER BY parquet_table.c1,
          parquet_table.c2
SEGMENTED BY hash(parquet_table.c1, parquet_table.c2) ALL NODES OFFSET 0;

SELECT MARK_DESIGN_KSAFE(0);

(1 row)

动态透视表

存储过程 unpivot() 将源表和目标表作为输入。它取消透视源表并将其输出到目标表中。

此示例使用下表:

=> SELECT * FROM make_the_columns_into_rows;
c1  | c2  |                  c3                  |             c4             |    c5    | c6
-----+-----+--------------------------------------+----------------------------+----------+----
123 | ABC | cf470c5b-50e3-492a-8483-b9e4f20d195a | 2021-08-24 18:49:40.835802 |  1.72964 | t
567 | EFG | 25ea7636-d924-4b4f-81b5-1e1c884b06e3 | 2021-08-04 18:49:40.835802 | 41.46100 | f
890 | XYZ | f588935a-35a4-4275-9e7f-ebb3986390e3 | 2021-08-29 19:53:39.465778 |  8.58207 | t
(3 rows)

此表包含以下列:

=> \d make_the_columns_into_rows
                                               List of Fields by Tables
Schema |           Table            | Column |     Type      | Size | Default | Not Null | Primary Key | Foreign Key
--------+----------------------------+--------+---------------+------+---------+----------+-------------+-------------
public | make_the_columns_into_rows | c1     | int           |    8 |         | f        | f           |
public | make_the_columns_into_rows | c2     | varchar(80)   |   80 |         | f        | f           |
public | make_the_columns_into_rows | c3     | uuid          |   16 |         | f        | f           |
public | make_the_columns_into_rows | c4     | timestamp     |    8 |         | f        | f           |
public | make_the_columns_into_rows | c5     | numeric(10,5) |    8 |         | f        | f           |
public | make_the_columns_into_rows | c6     | boolean       |    1 |         | f        | f           |
(6 rows)

目标表具有来自源表的列,这些列作为键/值对转换为行。它还有一个 ROWID 列,用于将键/值对绑定回源表中的原始行:

=> CREATE PROCEDURE unpivot(p_source_table_schema VARCHAR(128), p_source_table_name VARCHAR(128), p_target_table_schema VARCHAR(128), p_target_table_name VARCHAR(128)) AS $$
DECLARE
    explode_command VARCHAR(10000);
BEGIN
    explode_command := EXECUTE 'SELECT ''explode(string_to_array(''''['''' || '' || LISTAGG(''NVL('' || column_name || ''::VARCHAR, '''''''')'' USING PARAMETERS separator='' || '''','''' || '') || '' || '''']'''')) OVER (PARTITION BY rn)'' explode_command FROM (SELECT table_schema, table_name, column_name, ordinal_position FROM columns ORDER BY table_schema, table_name, ordinal_position LIMIT 10000000) foo WHERE table_schema = ''' || QUOTE_IDENT(p_source_table_schema) || ''' AND table_name = ''' || QUOTE_IDENT(p_source_table_name) || ''';';

    EXECUTE 'CREATE TABLE ' || QUOTE_IDENT(p_target_table_schema) || '.' || QUOTE_IDENT(p_target_table_name) || '
        AS SELECT rn rowid, column_name key, value FROM (SELECT (ordinal_position - 1) op, column_name
            FROM columns WHERE table_schema = ''' || QUOTE_IDENT(p_source_table_schema) || '''
            AND table_name = ''' || QUOTE_IDENT(p_source_table_name) || ''') a
            JOIN (SELECT rn, ' || explode_command || '
            FROM (SELECT ROW_NUMBER() OVER() rn, *
            FROM ' || QUOTE_IDENT(p_source_table_schema) || '.' || QUOTE_IDENT(p_source_table_name) || ') foo) b ON b.position = a.op';
END;
$$;

调用过程:

=> CALL unpivot('public', 'make_the_columns_into_rows', 'public', 'columns_into_rows');
 unpivot
---------
       0
(1 row)

=> SELECT * FROM columns_into_rows ORDER BY rowid, key;
rowid | key |                value
-------+-----+--------------------------------------
     1 | c1  | 123
     1 | c2  | ABC
     1 | c3  | cf470c5b-50e3-492a-8483-b9e4f20d195a
     1 | c4  | 2021-08-24 18:49:40.835802
     1 | c5  | 1.72964
     1 | c6  | t
     2 | c1  | 890
     2 | c2  | XYZ
     2 | c3  | f588935a-35a4-4275-9e7f-ebb3986390e3
     2 | c4  | 2021-08-29 19:53:39.465778
     2 | c5  | 8.58207
     2 | c6  | t
     3 | c1  | 567
     3 | c2  | EFG
     3 | c3  | 25ea7636-d924-4b4f-81b5-1e1c884b06e3
     3 | c4  | 2021-08-04 18:49:40.835802
     3 | c5  | 41.46100
     3 | c6  | f
(18 rows)

unpivot() 过程也可以处理源表中的新列。

将新列 z 添加到源表,然后使用相同的过程取消透视表:

=> ALTER TABLE make_the_columns_into_rows ADD COLUMN z VARCHAR;
ALTER TABLE

=> UPDATE make_the_columns_into_rows SET z = 'ZZZ' WHERE c1 IN (123, 890);
OUTPUT
--------
      2
(1 row)

=> CALL unpivot('public', 'make_the_columns_into_rows', 'public', 'columns_into_rows');
unpivot
---------
       0
(1 row)

=> SELECT * FROM columns_into_rows;
rowid | key |                value
-------+-----+--------------------------------------
     1 | c1  | 567
     1 | c2  | EFG
     1 | c3  | 25ea7636-d924-4b4f-81b5-1e1c884b06e3
     1 | c4  | 2021-08-04 18:49:40.835802
     1 | c5  | 41.46100
     1 | c6  | f
     1 | z   |   -- new column
     2 | c1  | 123
     2 | c2  | ABC
     2 | c3  | cf470c5b-50e3-492a-8483-b9e4f20d195a
     2 | c4  | 2021-08-24 18:49:40.835802
     2 | c5  | 1.72964
     2 | c6  | t
     2 | z   | ZZZ   -- new column
     3 | c1  | 890
     3 | c2  | XYZ
     3 | c3  | f588935a-35a4-4275-9e7f-ebb3986390e3
     3 | c4  | 2021-08-29 19:53:39.465778
     3 | c5  | 8.58207
     3 | c6  | t
     3 | z   | ZZZ   -- new column
(21 rows)

机器学习:优化 AUC 估计

ROC 函数可以近似估计 AUC(曲线下面积),其准确度取决于num_bins 参数;num_bins 值越大,提供的近似值越精确,但可能会影响性能。

您可以使用存储过程 accurate_auc() 来近似估计 AUC,它会自动确定给定 epsilon(误差项)的最佳 num_bins 值:

=> CREATE PROCEDURE accurate_auc(relation VARCHAR, observation_col VARCHAR, probability_col VARCHAR, epsilon FLOAT) AS $$
DECLARE
    auc_value FLOAT;
    previous_auc FLOAT;
    nbins INT;
BEGIN
    IF epsilon > 0.25 THEN
        RAISE EXCEPTION 'epsilon must not be bigger than 0.25';
    END IF;
    IF epsilon < 1e-12 THEN
        RAISE EXCEPTION 'epsilon must be bigger than 1e-12';
    END IF;
    auc_value := 0.5;
    previous_auc := 0; -- epsilon and auc should be always less than 1
    nbins := 100;
    WHILE abs(auc_value - previous_auc) > epsilon and nbins < 1000000 LOOP
        RAISE INFO 'auc_value: %', auc_value;
        RAISE INFO 'previous_auc: %', previous_auc;
        RAISE INFO 'nbins: %', nbins;
        previous_auc := auc_value;
        auc_value := EXECUTE 'SELECT auc FROM (select roc(' || QUOTE_IDENT(observation_col) || ',' || QUOTE_IDENT(probability_col) || ' USING parameters num_bins=$1, auc=true) over() FROM ' || QUOTE_IDENT(relation) || ') subq WHERE auc IS NOT NULL' USING nbins;
        nbins := nbins * 2;
    END LOOP;
    RAISE INFO 'Result_auc_value: %', auc_value;
END;
$$;

例如,给定 test_data.csv 中的以下数据:

1,0,0.186
1,1,0.993
1,1,0.9
1,1,0.839
1,0,0.367
1,0,0.362
0,1,0.6
1,1,0.726
...

(完整数据集见 test_data.csv

您可以将数据加载到表 categorical_test_data 中,如下所示:

=> \set datafile '\'/data/test_data.csv\''
=> CREATE TABLE categorical_test_data(obs INT, pred INT, prob FLOAT);
CREATE TABLE

=> COPY categorical_test_data FROM :datafile DELIMITER ',';

调用 accurate_auc()。对于此示例,近似估计的 AUC 将在 epsilon 0.01 内:

=> CALL accurate_auc('categorical_test_data', 'obs', 'prob', 0.01);
INFO 2005:  auc_value: 0.5
INFO 2005:  previous_auc: 0
INFO 2005:  nbins: 100
INFO 2005:  auc_value: 0.749597423510467
INFO 2005:  previous_auc: 0.5
INFO 2005:  nbins: 200
INFO 2005:  Result_auc_value: 0.750402576489533

test_data.csv

1,0,0.186
1,1,0.993
1,1,0.9
1,1,0.839
1,0,0.367
1,0,0.362
0,1,0.6
1,1,0.726
0,0,0.087
0,0,0.004
0,1,0.562
1,0,0.477
0,0,0.258
1,0,0.143
0,0,0.403
1,1,0.978
1,1,0.58
1,1,0.51
0,0,0.424
0,1,0.546
0,1,0.639
0,1,0.676
0,1,0.639
1,1,0.757
1,1,0.883
1,0,0.301
1,1,0.846
1,0,0.129
1,1,0.76
1,0,0.351
1,1,0.803
1,1,0.527
1,1,0.836
1,0,0.417
1,1,0.656
1,1,0.977
1,1,0.815
1,1,0.869
0,0,0.474
0,0,0.346
1,0,0.188
0,1,0.805
1,1,0.872
1,0,0.466
1,1,0.72
0,0,0.163
0,0,0.085
0,0,0.124
1,1,0.876
0,0,0.451
0,0,0.185
1,1,0.937
1,1,0.615
0,0,0.312
1,1,0.924
1,1,0.638
1,1,0.891
0,1,0.621
1,0,0.421
0,0,0.254
0,0,0.225
1,1,0.577
0,1,0.579
0,1,0.628
0,1,0.855
1,1,0.955
0,0,0.331
1,0,0.298
0,0,0.047
0,0,0.173
1,1,0.96
0,0,0.481
0,0,0.39
0,0,0.088
1,0,0.417
0,0,0.12
1,1,0.871
0,1,0.522
0,0,0.312
1,1,0.695
0,0,0.155
0,0,0.352
1,1,0.561
0,0,0.076
0,1,0.923
1,0,0.169
0,0,0.032
1,1,0.63
0,0,0.126
0,0,0.15
1,0,0.348
0,0,0.188
0,1,0.755
1,1,0.813
0,0,0.418
1,0,0.161
1,0,0.316
0,1,0.558
1,1,0.641
1,0,0.305