C++ 示例:分隔解析器和块分割器
ExampleDelimitedUDChunker
类在分隔符处划分输入。您可以将此块分割器与任何理解分隔输入的解析器一起使用。 ExampleDelimitedParser
是一个使用这个块分割器的 ContinuousUDParser
子类。
加载和使用示例
按如下所示加载并使用示例。
=> CREATE LIBRARY ExampleDelimitedParserLib AS '/home/dbadmin/EDP.so';
=> CREATE PARSER ExampleDelimitedParser AS
LANGUAGE 'C++' NAME 'DelimitedParserFrameworkExampleFactory'
LIBRARY ExampleDelimitedParserLib;
=> COPY t FROM stdin WITH PARSER ExampleDelimitedParser();
0
1
2
3
4
5
6
7
8
9
\.
块分割器实施
该块分割器支持分摊加载。alignPortion()
方法在当前部分找到第一个完整记录的开头,并将输入缓冲区与其对齐。记录终止符作为实参传递并在构造函数中设置。
StreamState ExampleDelimitedUDChunker::alignPortion(
ServerInterface &srvInterface,
DataBuffer &input, InputState state)
{
/* find the first record terminator. Its record belongs to the previous portion */
void *buf = reinterpret_cast<void *>(input.buf + input.offset);
void *term = memchr(buf, recordTerminator, input.size - input.offset);
if (term) {
/* record boundary found. Align to the start of the next record */
const size_t chunkSize = reinterpret_cast<size_t>(term) - reinterpret_cast<size_t>(buf);
input.offset += chunkSize
+ sizeof(char) /* length of record terminator */;
/* input.offset points at the start of the first complete record in the portion */
return DONE;
} else if (state == END_OF_FILE || state == END_OF_PORTION) {
return REJECT;
} else {
VIAssert(state == START_OF_PORTION || state == OK);
return INPUT_NEEDED;
}
}
process()
方法必须考虑跨越部分边界的块。如果之前的调用在某个部分的末尾,则该方法设置一个标志。代码首先检查并处理该条件。逻辑与 alignPortion()
类似,所以示例调用它来做部分除法。
StreamState ExampleDelimitedUDChunker::process(
ServerInterface &srvInterface,
DataBuffer &input,
InputState input_state)
{
const size_t termLen = 1;
const char *terminator = &recordTerminator;
if (pastPortion) {
/*
* Previous state was END_OF_PORTION, and the last chunk we will produce
* extends beyond the portion we started with, into the next portion.
* To be consistent with alignPortion(), that means finding the first
* record boundary, and setting the chunk to be at that boundary.
* Fortunately, this logic is identical to aligning the portion (with
* some slight accounting for END_OF_FILE)!
*/
const StreamState findLastTerminator = alignPortion(srvInterface, input);
switch (findLastTerminator) {
case DONE:
return DONE;
case INPUT_NEEDED:
if (input_state == END_OF_FILE) {
/* there is no more input where we might find a record terminator */
input.offset = input.size;
return DONE;
}
return INPUT_NEEDED;
default:
VIAssert("Invalid return state from alignPortion()");
}
return findLastTerminator;
}
现在,该方法查找分隔符。如果输入从一个部分的末尾开始,它会设置标志。
size_t ret = input.offset, term_index = 0;
for (size_t index = input.offset; index < input.size; ++index) {
const char c = input.buf[index];
if (c == terminator[term_index]) {
++term_index;
if (term_index == termLen) {
ret = index + 1;
term_index = 0;
}
continue;
} else if (term_index > 0) {
index -= term_index;
}
term_index = 0;
}
if (input_state == END_OF_PORTION) {
/*
* Regardless of whether or not a record was found, the next chunk will extend
* into the next portion.
*/
pastPortion = true;
}
最后,process()
移动输入偏移量并返回。
// if we were able to find some rows, move the offset to point at the start of the next (potential) row, or end of block
if (ret > input.offset) {
input.offset = ret;
return CHUNK_ALIGNED;
}
if (input_state == END_OF_FILE) {
input.offset = input.size;
return DONE;
}
return INPUT_NEEDED;
}
工厂实施
文件 ExampleDelimitedParser.cpp
定义了一个使用此 UDChunker
的工厂。块分割器支持分摊加载,因此工厂实施 isChunkerApportionable()
:
virtual bool isChunkerApportionable(ServerInterface &srvInterface) {
ParamReader params = srvInterface.getParamReader();
if (params.containsParameter("disable_chunker") && params.getBoolRef("d\
isable_chunker")) {
return false;
} else {
return true;
}
}
prepareChunker()
方法创建块分割器:
virtual UDChunker* prepareChunker(ServerInterface &srvInterface,
PerColumnParamReader &perColumnParamReade\
r,
PlanContext &planCtxt,
const SizedColumnTypes &returnType)
{
ParamReader params = srvInterface.getParamReader();
if (params.containsParameter("disable_chunker") && params.getBoolRef("d\
isable_chunker")) {
return NULL;
}
std::string recordTerminator("\n");
ParamReader args(srvInterface.getParamReader());
if (args.containsParameter("record_terminator")) {
recordTerminator = args.getStringRef("record_terminator").str();
}
return vt_createFuncObject<ExampleDelimitedUDChunker>(srvInterface.allo\
cator,
recordTerminator[0]);
}