C++ 示例:可取消的 UDSource
在 SDK 示例的 filelib.cpp 中找到的 FifoSource 示例演示了如何使用 cancel() 和 isCanceled()。此源从指定的管道执行读取操作。与从文件读取不同,从管道读取可能会堵塞。因此,我们需要能够取消从此源加载数据。
为了管理取消操作,UDx 使用了管道,它是一种用于进程间通信的数据通道。某个进程可以将数据写入管道的写入端,并在另一个进程从管道的读取端读取数据之前保持可用。此示例不通过该管道传递数据;相反,它使用该管道来管理取消操作,如下面进一步所述。除了管道的两个文件描述符(每端一个)之外,UDx 还会为要从其读取的文件创建文件描述符。setup() 函数将创建管道,然后打开相应文件。
virtual void setup(ServerInterface &srvInterface) {
// cancelPipe is a pipe used only for checking cancellation
if (pipe(cancelPipe)) {
vt_report_error(0, "Error opening control structure");
}
// handle to the named pipe from which we read data
namedPipeFd = open(filename.c_str(), O_RDONLY | O_NONBLOCK);
if (namedPipeFd < 0) {
vt_report_error(0, "Error opening fifo [%s]", filename.c_str());
}
}
现在有三个文件描述符:namedPipeFd、cancelPipe[PIPE_READ] 和 cancelPipe[PIPE_WRITE]。上述每个描述符最终都必须关闭。
此 UDx 使用 poll() 系统调用来等待数据从指定的管道到达 (namedPipeFd) 或等待取消查询 (cancelPipe[PIPE_READ])。process() 函数将执行轮询、检查结果、检查是否已取消查询、在需要时写入输出,然后返回结果。
virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output) {
struct pollfd pollfds[2] = {
{ namedPipeFd, POLLIN, 0 },
{ cancelPipe[PIPE_READ], POLLIN, 0 }
};
if (poll(pollfds, 2, -1) < 0) {
vt_report_error(1, "Error reading [%s]", filename.c_str());
}
if (pollfds[1].revents & (POLLIN | POLLHUP)) {
/* This can only happen after cancel() has been called */
VIAssert(isCanceled());
return DONE;
}
VIAssert(pollfds[PIPE_READ].revents & (POLLIN | POLLHUP));
const ssize_t amount = read(namedPipeFd, output.buf + output.offset, output.size - output.offset);
if (amount < 0) {
vt_report_error(1, "Error reading from fifo [%s]", filename.c_str());
}
if (amount == 0 || isCanceled()) {
return DONE;
} else {
output.offset += amount;
return OUTPUT_NEEDED;
}
}
如果查询被取消,则 cancel() 函数会关闭管道的写入端。process() 下一次轮询输入时,它会在管道的读取端找不到输入时退出。否则,它会继续操作。此外,该函数还会调用 isCanceled() 以在返回 OUTPUT_NEEDED(表示已填满缓冲区且正在等待下游处理的信号)之前检查是否已取消查询。
cancel() 函数仅执行中断对 process() 的调用所需的工作。相反,始终需要执行(而不仅仅是为了取消查询)的清理是在 destroy() 或析构函数中完成的。cancel() 函数会关闭管道的写入端。(稍后将显示 helper 函数。)
virtual void cancel(ServerInterface &srvInterface) {
closeIfNeeded(cancelPipe[PIPE_WRITE]);
}
在 cancel() 中关闭指定的管道并不安全,因为如果另一个进程(如另一个查询)要在 UDx 完成之前将文件描述符编号重用于新描述符,则关闭指定的管道可能会产生竞争条件。我们会改为在 destroy() 中关闭指定的管道以及管道的读取端。
virtual void destroy(ServerInterface &srvInterface) {
closeIfNeeded(namedPipeFd);
closeIfNeeded(cancelPipe[PIPE_READ]);
}
在 destroy() 中关闭管道的写入端并不安全,因为 cancel() 会关闭它且可以使用 destroy() 进行并发调用。因此,我们在析构函数中关闭管道的写入端。
~FifoSource() {
closeIfNeeded(cancelPipe[PIPE_WRITE]);
}
Udx 会使用 helper 函数 closeIfNeeded() 来确保每个文件描述符正好关闭一次。
void closeIfNeeded(int &fd) {
if (fd >= 0) {
close(fd);
fd = -1;
}
}