C++ 示例:可取消的 UDSource

在 SDK 示例的 filelib.cpp 中找到的 FifoSource 示例演示了如何使用 cancel()isCanceled()。此源从指定的管道执行读取操作。与从文件读取不同,从管道读取可能会堵塞。因此,我们需要能够取消从此源加载数据。

为了管理取消操作,UDx 使用了管道,它是一种用于进程间通信的数据通道。某个进程可以将数据写入管道的写入端,并在另一个进程从管道的读取端读取数据之前保持可用。此示例不通过该管道传递数据;相反,它使用该管道来管理取消操作,如下面进一步所述。除了管道的两个文件描述符(每端一个)之外,UDx 还会为要从其读取的文件创建文件描述符。setup() 函数将创建管道,然后打开相应文件。

virtual void setup(ServerInterface &srvInterface) {
  // cancelPipe is a pipe used only for checking cancellation
  if (pipe(cancelPipe)) {
    vt_report_error(0, "Error opening control structure");
  }

  // handle to the named pipe from which we read data
  namedPipeFd = open(filename.c_str(), O_RDONLY | O_NONBLOCK);
  if (namedPipeFd < 0) {
    vt_report_error(0, "Error opening fifo [%s]", filename.c_str());
  }
}

现在有三个文件描述符:namedPipeFdcancelPipe[PIPE_READ]cancelPipe[PIPE_WRITE]。上述每个描述符最终都必须关闭。

此 UDx 使用 poll() 系统调用来等待数据从指定的管道到达 (namedPipeFd) 或等待取消查询 (cancelPipe[PIPE_READ])。process() 函数将执行轮询、检查结果、检查是否已取消查询、在需要时写入输出,然后返回结果。

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output) {
  struct pollfd pollfds[2] = {
    { namedPipeFd,           POLLIN, 0 },
    { cancelPipe[PIPE_READ], POLLIN, 0 }
  };

  if (poll(pollfds, 2, -1) < 0) {
    vt_report_error(1, "Error reading [%s]", filename.c_str());
  }

  if (pollfds[1].revents & (POLLIN | POLLHUP)) {
    /* This can only happen after cancel() has been called */
    VIAssert(isCanceled());
    return DONE;
  }

  VIAssert(pollfds[PIPE_READ].revents & (POLLIN | POLLHUP));

  const ssize_t amount = read(namedPipeFd, output.buf + output.offset, output.size - output.offset);
  if (amount < 0) {
    vt_report_error(1, "Error reading from fifo [%s]", filename.c_str());
  }

  if (amount == 0 || isCanceled()) {
    return DONE;
  } else {
    output.offset += amount;
    return OUTPUT_NEEDED;
  }
}

如果查询被取消,则 cancel() 函数会关闭管道的写入端。process() 下一次轮询输入时,它会在管道的读取端找不到输入时退出。否则,它会继续操作。此外,该函数还会调用 isCanceled() 以在返回 OUTPUT_NEEDED(表示已填满缓冲区且正在等待下游处理的信号)之前检查是否已取消查询。

cancel() 函数仅执行中断对 process() 的调用所需的工作。相反,始终需要执行(而不仅仅是为了取消查询)的清理是在 destroy() 或析构函数中完成的。cancel() 函数会关闭管道的写入端。(稍后将显示 helper 函数。)


virtual void cancel(ServerInterface &srvInterface) {
  closeIfNeeded(cancelPipe[PIPE_WRITE]);
}

cancel() 中关闭指定的管道并不安全,因为如果另一个进程(如另一个查询)要在 UDx 完成之前将文件描述符编号重用于新描述符,则关闭指定的管道可能会产生竞争条件。我们会改为在 destroy() 中关闭指定的管道以及管道的读取端。

virtual void destroy(ServerInterface &srvInterface) {
  closeIfNeeded(namedPipeFd);
  closeIfNeeded(cancelPipe[PIPE_READ]);
}

destroy() 中关闭管道的写入端并不安全,因为 cancel() 会关闭它且可以使用 destroy() 进行并发调用。因此,我们在析构函数中关闭管道的写入端。


~FifoSource() {
  closeIfNeeded(cancelPipe[PIPE_WRITE]);
}

Udx 会使用 helper 函数 closeIfNeeded() 来确保每个文件描述符正好关闭一次。

void closeIfNeeded(int &fd) {
  if (fd >= 0) {
    close(fd);
    fd = -1;
  }
}