C++ 示例:可取消的 UDSource
在 SDK 示例的 filelib.cpp
中找到的 FifoSource
示例演示了如何使用 cancel()
和 isCanceled()
。此源从指定的管道执行读取操作。与从文件读取不同,从管道读取可能会堵塞。因此,我们需要能够取消从此源加载数据。
为了管理取消操作,UDx 使用了管道,它是一种用于进程间通信的数据通道。某个进程可以将数据写入管道的写入端,并在另一个进程从管道的读取端读取数据之前保持可用。此示例不通过该管道传递数据;相反,它使用该管道来管理取消操作,如下面进一步所述。除了管道的两个文件描述符(每端一个)之外,UDx 还会为要从其读取的文件创建文件描述符。setup()
函数将创建管道,然后打开相应文件。
virtual void setup(ServerInterface &srvInterface) {
// cancelPipe is a pipe used only for checking cancellation
if (pipe(cancelPipe)) {
vt_report_error(0, "Error opening control structure");
}
// handle to the named pipe from which we read data
namedPipeFd = open(filename.c_str(), O_RDONLY | O_NONBLOCK);
if (namedPipeFd < 0) {
vt_report_error(0, "Error opening fifo [%s]", filename.c_str());
}
}
现在有三个文件描述符:namedPipeFd
、cancelPipe[PIPE_READ]
和 cancelPipe[PIPE_WRITE]
。上述每个描述符最终都必须关闭。
此 UDx 使用 poll()
系统调用来等待数据从指定的管道到达 (namedPipeFd
) 或等待取消查询 (cancelPipe[PIPE_READ]
)。process()
函数将执行轮询、检查结果、检查是否已取消查询、在需要时写入输出,然后返回结果。
virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output) {
struct pollfd pollfds[2] = {
{ namedPipeFd, POLLIN, 0 },
{ cancelPipe[PIPE_READ], POLLIN, 0 }
};
if (poll(pollfds, 2, -1) < 0) {
vt_report_error(1, "Error reading [%s]", filename.c_str());
}
if (pollfds[1].revents & (POLLIN | POLLHUP)) {
/* This can only happen after cancel() has been called */
VIAssert(isCanceled());
return DONE;
}
VIAssert(pollfds[PIPE_READ].revents & (POLLIN | POLLHUP));
const ssize_t amount = read(namedPipeFd, output.buf + output.offset, output.size - output.offset);
if (amount < 0) {
vt_report_error(1, "Error reading from fifo [%s]", filename.c_str());
}
if (amount == 0 || isCanceled()) {
return DONE;
} else {
output.offset += amount;
return OUTPUT_NEEDED;
}
}
如果查询被取消,则 cancel()
函数会关闭管道的写入端。process()
下一次轮询输入时,它会在管道的读取端找不到输入时退出。否则,它会继续操作。此外,该函数还会调用 isCanceled()
以在返回 OUTPUT_NEEDED
(表示已填满缓冲区且正在等待下游处理的信号)之前检查是否已取消查询。
cancel()
函数仅执行中断对 process()
的调用所需的工作。相反,始终需要执行(而不仅仅是为了取消查询)的清理是在 destroy()
或析构函数中完成的。cancel()
函数会关闭管道的写入端。(稍后将显示 helper 函数。)
virtual void cancel(ServerInterface &srvInterface) {
closeIfNeeded(cancelPipe[PIPE_WRITE]);
}
在 cancel()
中关闭指定的管道并不安全,因为如果另一个进程(如另一个查询)要在 UDx 完成之前将文件描述符编号重用于新描述符,则关闭指定的管道可能会产生竞争条件。我们会改为在 destroy()
中关闭指定的管道以及管道的读取端。
virtual void destroy(ServerInterface &srvInterface) {
closeIfNeeded(namedPipeFd);
closeIfNeeded(cancelPipe[PIPE_READ]);
}
在 destroy()
中关闭管道的写入端并不安全,因为 cancel()
会关闭它且可以使用 destroy()
进行并发调用。因此,我们在析构函数中关闭管道的写入端。
~FifoSource() {
closeIfNeeded(cancelPipe[PIPE_WRITE]);
}
Udx 会使用 helper 函数 closeIfNeeded()
来确保每个文件描述符正好关闭一次。
void closeIfNeeded(int &fd) {
if (fd >= 0) {
close(fd);
fd = -1;
}
}