使用 VHash 对数据进行预分段

VHash 类是 Vertica 哈希函数的实施,可与 JDBC 客户端应用程序结合使用。

使用 Vertica 中的哈希分段,您可以基于内置的哈希函数对投影进行分段。内置的哈希函数可使数据平均分布到群集中的部分或全部节点,从而提供最佳的查询执行。

假设您有数百万个值行分布在几千个 CSV 文件之中。假设您已创建了一个使用哈希算法进行分段的表。将值加载到数据库之前,您可能想要确定应将特定值加载到哪个节点。因此,使用 VHash 会特别有帮助,因为此类允许您在加载数据之前对数据进行预分段。

以下示例显示了对名为“testFile.csv”的文件的第一列使用哈希算法的 VHash 类。该文件中第一列的名称为 meterId

使用 VHash 对数据进行分段

以下示例演示了如何从本地文件系统读取 testFile.csv 文件并对 meteterId 列运行哈希函数。然后,您可以使用投影中的数据库元数据基于 meterId 的哈希值对该文件中的各个列进行预分段。

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.io.IOException;
import java.sql.*;

import com.vertica.jdbc.kv.VHash;

public class VerticaKVDoc {

    final Map<String, FileOutputStream> files;
    final Map<String, List<Long>> nodeToHashList;
    String segmentationMetadata;
    List<String> lines;

    public static void main(String[] args) throws Exception {
        try {
            Class.forName("com.vertica.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            System.err.println("Could not find the JDBC driver class.");
            e.printStackTrace();
            return;
        }

        Properties myProp = new Properties();
        myProp.put("user", "username");
        myProp.put("password", "password");

        VerticaKVDoc ex = new VerticaKVDoc();

        // Read in the data from a CSV file.
        ex.readLinesFromFile("C:\\testFile.csv");

        try (Connection conn = DriverManager.getConnection(
                "jdbc:vertica://VerticaHost:portNumber/databaseName", myProp)) {

        // Compute the hashes and create FileOutputStreams.
        ex.prepareForHashing(conn);

        }

        // Write to files.
        ex.writeLinesToFiles();
    }

    public VerticaKVDoc() {
        files = new HashMap<String, FileOutputStream>();
        nodeToHashList = new HashMap<String, List<Long>>();
    }

    public void prepareForHashing(Connection conn) throws SQLException,
            FileNotFoundException {

        // Send a query to Vertica to return the projection segments.
        try (ResultSet rs = conn.createStatement().executeQuery(
                "SELECT get_projection_segments('public.projectionName')")) {
            rs.next();
            segmentationMetadata = rs.getString(1);
        }

        // Initialize the data files.
        try (ResultSet rs = conn.createStatement().executeQuery(
                "SELECT node_name FROM nodes")) {
            while (rs.next()) {
                String node = rs.getString(1);
                files.put(node, new FileOutputStream(node + ".csv"));
            }
        }
    }

    public void writeLinesToFiles() throws UnsupportedEncodingException,
            IOException {
        for (String line : lines) {

            long hashedValue = VHash.hashLong(getMeterIdFromLine(line));

            // Write the row data to that node's data file.
            String node = VHash.getNodeFor(segmentationMetadata, hashedValue);

            FileOutputStream fos = files.get(node);
            fos.write(line.getBytes("UTF-8"));
        }
    }

    private long getMeterIdFromLine(String line) {

        // In our file, "meterId" is the name of the first column in the file.
        return Long.parseLong(line.split(",")[0]);
    }

    public void readLinesFromFile(String filename) throws IOException {
        lines = new ArrayList<String>();
        String line;
        try (BufferedReader reader = new BufferedReader(
                new FileReader(filename))) {
            while ((line = reader.readLine()) != null) {
                lines.add(line);
            }
        }
    }

}