Java 示例:数字文本
此 NumericTextParser
示例可解析以单词而非数字表示的整数值(例如,"one two three" 代表一百二十三)。解析器将执行下列操作:
-
接受单个参数,以设置用来分隔数据行中各列的字符。分隔符默认设置为管道 (|) 字符。
-
忽略额外空格和用于表示数字的单词的首字母大写。
-
使用以下单词标识数字:zero、one、two、three、four、five、six、seven、eight 和 nine。
-
假设表示整数的单词至少用一个空格分隔。
-
拒绝无法完全解析为整数的任何数据行。
-
在输出表包含非整数列时生成错误。
加载和使用示例
按如下所示加载并使用解析器:
=> CREATE LIBRARY JavaLib AS '/home/dbadmin/JavaLib.jar' LANGUAGE 'JAVA';
CREATE LIBRARY
=> CREATE PARSER NumericTextParser AS LANGUAGE 'java'
-> NAME 'com.myCompany.UDParser.NumericTextParserFactory'
-> LIBRARY JavaLib;
CREATE PARSER FUNCTION
=> CREATE TABLE t (i INTEGER);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One
>> Two
>> One Two Three
>> \.
=> SELECT * FROM t ORDER BY i;
i
-----
1
2
123
(3 rows)
=> DROP TABLE t;
DROP TABLE
=> -- Parse multi-column input
=> CREATE TABLE t (i INTEGER, j INTEGER);
CREATE TABLE
=> COPY t FROM stdin WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One | Two
>> Two | Three
>> One Two Three | four Five Six
>> \.
=> SELECT * FROM t ORDER BY i;
i | j
-----+-----
1 | 2
2 | 3
123 | 456
(3 rows)
=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> -- Use alternate separator character
=> COPY t FROM STDIN WITH PARSER NumericTextParser(separator='*');
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> Five * Six
>> seven * eight
>> nine * one zero
>> \.
=> SELECT * FROM t ORDER BY i;
i | j
---+----
5 | 6
7 | 8
9 | 10
(3 rows)
=> TRUNCATE TABLE t;
TRUNCATE TABLE
=> -- Rows containing data that does not parse into digits is rejected.
=> DROP TABLE t;
DROP TABLE
=> CREATE TABLE t (i INTEGER);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
Enter data to be copied followed by a newline.
End with a backslash and a period on a line by itself.
>> One Zero Zero
>> Two Zero Zero
>> Three Zed Zed
>> Four Zero Zero
>> Five Zed Zed
>> \.
SELECT * FROM t ORDER BY i;
i
-----
100
200
400
(3 rows)
=> -- Generate an error by trying to copy into a table with a non-integer column
=> DROP TABLE t;
DROP TABLE
=> CREATE TABLE t (i INTEGER, j VARCHAR);
CREATE TABLE
=> COPY t FROM STDIN WITH PARSER NumericTextParser();
vsql:UDParse.sql:94: ERROR 3399: Failure in UDx RPC call
InvokeGetReturnTypeParser(): Error in User Defined Object [NumericTextParser],
error code: 0
com.vertica.sdk.UdfException: Column 2 of output table is not an Int
at com.myCompany.UDParser.NumericTextParserFactory.getParserReturnType
(NumericTextParserFactory.java:70)
at com.vertica.udxfence.UDxExecContext.getReturnTypeParser(
UDxExecContext.java:1464)
at com.vertica.udxfence.UDxExecContext.getReturnTypeParser(
UDxExecContext.java:768)
at com.vertica.udxfence.UDxExecContext.run(UDxExecContext.java:236)
at java.lang.Thread.run(Thread.java:662)
解析器实施
以下代码可实施解析器。
package com.myCompany.UDParser;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Pattern;
import com.vertica.sdk.DataBuffer;
import com.vertica.sdk.DestroyInvocation;
import com.vertica.sdk.RejectedRecord;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.State.InputState;
import com.vertica.sdk.State.StreamState;
import com.vertica.sdk.StreamWriter;
import com.vertica.sdk.UDParser;
import com.vertica.sdk.UdfException;
public class NumericTextParser extends UDParser {
private String separator; // Holds column separator character
// List of strings that we accept as digits.
private List<String> numbers = Arrays.asList("zero", "one",
"two", "three", "four", "five", "six", "seven",
"eight", "nine");
// Hold information about the last rejected row.
private String rejectedReason;
private String rejectedRow;
// Constructor gets the separator character from the Factory's prepare()
// method.
public NumericTextParser(String sepparam) {
super();
this.separator = sepparam;
}
// Called to perform the actual work of parsing. Gets a buffer of bytes
// to turn into tuples.
@Override
public StreamState process(ServerInterface srvInterface, DataBuffer input,
InputState input_state) throws UdfException, DestroyInvocation {
int i=input.offset; // Current position in the input buffer
// Flag to indicate whether we just found the end of a row.
boolean lastCharNewline = false;
// Buffer to hold the row of data being read.
StringBuffer line = new StringBuffer();
//Continue reading until end of buffer.
for(; i < input.buf.length; i++){
// Loop through input until we find a linebreak: marks end of row
char inchar = (char) input.buf[i];
// Note that this isn't a robust way to find rows. It should
// accept a user-defined row separator. Also, the following
// assumes ASCII line break metheods, which isn't a good idea
// in the UTF world. But it is good enough for this simple example.
if (inchar != '\n' && inchar != '\r') {
// Keep adding to a line buffer until a full row of data is read
line.append(inchar);
lastCharNewline = false; // Last character not a new line
} else {
// Found a line break. Process the row.
lastCharNewline = true; // indicate we got a complete row
// Update the position in the input buffer. This is updated
// whether the row is successfully processed or not.
input.offset = i+1;
// Call procesRow to extract values and write tuples to the
// output. Returns false if there was an error.
if (!processRow(line)) {
// Processing row failed. Save bad row to rejectedRow field
// and return to caller indicating a rejected row.
rejectedRow = line.toString();
// Update position where we processed the data.
return StreamState.REJECT;
}
line.delete(0, line.length()); // clear row buffer
}
}
// At this point, process() has finished processing the input buffer.
// There are two possibilities: need to get more data
// from the input stream to finish processing, or there is
// no more data to process. If at the end of the input stream and
// the row was not terminated by a linefeed, it may need
// to process the last row.
if (input_state == InputState.END_OF_FILE && lastCharNewline) {
// End of input and it ended on a newline. Nothing more to do
return StreamState.DONE;
} else if (input_state == InputState.END_OF_FILE && !lastCharNewline) {
// At end of input stream but didn't get a final newline. Need to
// process the final row that was read in, then exit for good.
if (line.length() == 0) {
// Nothing to process. Done parsing.
return StreamState.DONE;
}
// Need to parse the last row, not terminated by a linefeed. This
// can occur if the file being read didn't have a final line break.
if (processRow(line)) {
return StreamState.DONE;
} else {
// Processing last row failed. Save bad row to rejectedRow field
// and return to caller indicating a rejected row.
rejectedRow = line.toString();
// Tell Vertica the entire buffer was processed so it won't
// call again to have the line processed.
input.offset = input.buf.length;
return StreamState.REJECT;
}
} else {
// Stream is not fully read, so tell Vertica to send more. If
// process() didn't get a complete row before it hit the end of the
// input buffer, it will end up re-processing that segment again
// when more data is added to the buffer.
return StreamState.INPUT_NEEDED;
}
}
// Breaks a row into columns, then parses the content of the
// columns. Returns false if there was an error parsing the
// row, in which case it sets the rejected row to the input
// line. Returns true if the row was successfully read.
private boolean processRow(StringBuffer line)
throws UdfException, DestroyInvocation {
String[] columns = line.toString().split(Pattern.quote(separator));
// Loop through the columns, decoding their contents
for (int col = 0; col < columns.length; col++) {
// Call decodeColumn to extract value from this column
Integer colval = decodeColumn(columns[col]);
if (colval == null) {
// Could not parse one of the columns. Indicate row should
// be rejected.
return false;
}
// Column parsed OK. Write it to the output. writer is a field
// provided by the parent class. Since this parser only accepts
// integers, there is no need to verify that data type of the parsed
// data matches the data type of the column being written. In your
// UDParsers, you may want to perform this verification.
writer.setLong(col,colval);
}
// Done with the row of data. Advance output to next row.
// Note that this example does not verify that all of the output columns
// have values assigned to them. If there are missing values at the
// end of a row, they get automatically get assigned a default value
// (0 for integers). This isn't a robust solution. Your UDParser
// should perform checks here to handle this situation and set values
// (such as null) when appropriate.
writer.next();
return true; // Successfully processed the row.
}
// Gets a string with text numerals, i.e. "One Two Five Seven" and turns
// it into an integer value, i.e. 1257. Returns null if the string could not
// be parsed completely into numbers.
private Integer decodeColumn(String text) {
int value = 0; // Hold the value being parsed.
// Split string into individual words. Eat extra spaces.
String[] words = text.toLowerCase().trim().split("\\s+");
// Loop through the words, matching them against the list of
// digit strings.
for (int i = 0; i < words.length; i++) {
if (numbers.contains(words[i])) {
// Matched a digit. Add the it to the value.
int digit = numbers.indexOf(words[i]);
value = (value * 10) + digit;
} else {
// The string didn't match one of the accepted string values
// for digits. Need to reject the row. Set the rejected
// reason string here so it can be incorporated into the
// rejected reason object.
//
// Note that this example does not handle null column values.
// In most cases, you want to differentiate between an
// unparseable column value and a missing piece of input
// data. This example just rejects the row if there is a missing
// column value.
rejectedReason = String.format(
"Could not parse '%s' into a digit",words[i]);
return null;
}
}
return value;
}
// Vertica calls this method if the parser rejected a row of data
// to find out what went wrong and add to the proper logs. Just gathers
// information stored in fields and returns it in an object.
@Override
public RejectedRecord getRejectedRecord() throws UdfException {
return new RejectedRecord(rejectedReason,rejectedRow.toCharArray(),
rejectedRow.length(), "\n");
}
}
ParserFactory 实施
以下代码可实施解析器工厂。
NumericTextParser
接受名为 separator
的单个可选参数。您可以在 getParameterType()
方法中定义此参数,而 plan()
方法可存储此参数的值。 NumericTextParser
仅输出整数值。因此,如果输出表包含数据类型是非整数的列,getParserReturnType()
方法将抛出异常。
package com.myCompany.UDParser;
import java.util.regex.Pattern;
import com.vertica.sdk.ParamReader;
import com.vertica.sdk.ParamWriter;
import com.vertica.sdk.ParserFactory;
import com.vertica.sdk.PerColumnParamReader;
import com.vertica.sdk.PlanContext;
import com.vertica.sdk.ServerInterface;
import com.vertica.sdk.SizedColumnTypes;
import com.vertica.sdk.UDParser;
import com.vertica.sdk.UdfException;
import com.vertica.sdk.VerticaType;
public class NumericTextParserFactory extends ParserFactory {
// Called once on the initiator host to check the parameters and set up the
// context data that hosts performing processing will need later.
@Override
public void plan(ServerInterface srvInterface,
PerColumnParamReader perColumnParamReader,
PlanContext planCtxt) {
String separator = "|"; // assume separator is pipe character
// See if a parameter was given for column separator
ParamReader args = srvInterface.getParamReader();
if (args.containsParameter("separator")) {
separator = args.getString("separator");
if (separator.length() > 1) {
throw new UdfException(0,
"Separator parameter must be a single character");
}
if (Pattern.quote(separator).matches("[a-zA-Z]")) {
throw new UdfException(0,
"Separator parameter cannot be a letter");
}
}
// Save separator character in the Plan Data
ParamWriter context = planCtxt.getWriter();
context.setString("separator", separator);
}
// Define the data types of the output table that the parser will return.
// Mainly, this just ensures that all of the columns in the table which
// is the target of the data load are integer.
@Override
public void getParserReturnType(ServerInterface srvInterface,
PerColumnParamReader perColumnParamReader,
PlanContext planCtxt,
SizedColumnTypes argTypes,
SizedColumnTypes returnType) {
// Get access to the output table's columns
for (int i = 0; i < argTypes.getColumnCount(); i++ ) {
if (argTypes.getColumnType(i).isInt()) {
// Column is integer... add it to the output
returnType.addInt(argTypes.getColumnName(i));
} else {
// Column isn't an int, so throw an exception.
// Technically, not necessary since the
// UDx framework will automatically error out when it sees a
// Discrepancy between the type in the target table and the
// types declared by this method. Throwing this exception will
// provide a clearer error message to the user.
String message = String.format(
"Column %d of output table is not an Int", i + 1);
throw new UdfException(0, message);
}
}
}
// Instantiate the UDParser subclass named NumericTextParser. Passes the
// separator characetr as a paramter to the constructor.
@Override
public UDParser prepare(ServerInterface srvInterface,
PerColumnParamReader perColumnParamReader, PlanContext planCtxt,
SizedColumnTypes returnType) throws UdfException {
// Get the separator character from the context
String separator = planCtxt.getReader().getString("separator");
return new NumericTextParser(separator);
}
// Describe the parameters accepted by this parser.
@Override
public void getParameterType(ServerInterface srvInterface,
SizedColumnTypes parameterTypes) {
parameterTypes.addVarchar(1, "separator");
}
}