使用 VHash 对数据进行预分段
VHash 类是 Vertica 哈希函数的实施,可与 JDBC 客户端应用程序结合使用。
使用 Vertica 中的哈希分段,您可以基于内置的哈希函数对投影进行分段。内置的哈希函数可使数据平均分布到群集中的部分或全部节点,从而提供最佳的查询执行。
假设您有数百万个值行分布在几千个 CSV 文件之中。假设您已创建了一个使用哈希算法进行分段的表。将值加载到数据库之前,您可能想要确定应将特定值加载到哪个节点。因此,使用 VHash 会特别有帮助,因为此类允许您在加载数据之前对数据进行预分段。
以下示例显示了对名为“testFile.csv”的文件的第一列使用哈希算法的 VHash 类。该文件中第一列的名称为 meterId。
使用 VHash 对数据进行分段
以下示例演示了如何从本地文件系统读取 testFile.csv 文件并对 meteterId 列运行哈希函数。然后,您可以使用投影中的数据库元数据基于 meterId 的哈希值对该文件中的各个列进行预分段。
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.io.IOException;
import java.sql.*;
import com.vertica.jdbc.kv.VHash;
public class VerticaKVDoc {
final Map<String, FileOutputStream> files;
final Map<String, List<Long>> nodeToHashList;
String segmentationMetadata;
List<String> lines;
public static void main(String[] args) throws Exception {
try {
Class.forName("com.vertica.jdbc.Driver");
} catch (ClassNotFoundException e) {
System.err.println("Could not find the JDBC driver class.");
e.printStackTrace();
return;
}
Properties myProp = new Properties();
myProp.put("user", "username");
myProp.put("password", "password");
VerticaKVDoc ex = new VerticaKVDoc();
// Read in the data from a CSV file.
ex.readLinesFromFile("C:\\testFile.csv");
try (Connection conn = DriverManager.getConnection(
"jdbc:vertica://VerticaHost:portNumber/databaseName", myProp)) {
// Compute the hashes and create FileOutputStreams.
ex.prepareForHashing(conn);
}
// Write to files.
ex.writeLinesToFiles();
}
public VerticaKVDoc() {
files = new HashMap<String, FileOutputStream>();
nodeToHashList = new HashMap<String, List<Long>>();
}
public void prepareForHashing(Connection conn) throws SQLException,
FileNotFoundException {
// Send a query to Vertica to return the projection segments.
try (ResultSet rs = conn.createStatement().executeQuery(
"SELECT get_projection_segments('public.projectionName')")) {
rs.next();
segmentationMetadata = rs.getString(1);
}
// Initialize the data files.
try (ResultSet rs = conn.createStatement().executeQuery(
"SELECT node_name FROM nodes")) {
while (rs.next()) {
String node = rs.getString(1);
files.put(node, new FileOutputStream(node + ".csv"));
}
}
}
public void writeLinesToFiles() throws UnsupportedEncodingException,
IOException {
for (String line : lines) {
long hashedValue = VHash.hashLong(getMeterIdFromLine(line));
// Write the row data to that node's data file.
String node = VHash.getNodeFor(segmentationMetadata, hashedValue);
FileOutputStream fos = files.get(node);
fos.write(line.getBytes("UTF-8"));
}
}
private long getMeterIdFromLine(String line) {
// In our file, "meterId" is the name of the first column in the file.
return Long.parseLong(line.split(",")[0]);
}
public void readLinesFromFile(String filename) throws IOException {
lines = new ArrayList<String>();
String line;
try (BufferedReader reader = new BufferedReader(
new FileReader(filename))) {
while ((line = reader.readLine()) != null) {
lines.add(line);
}
}
}
}