C++ example: cancelable UDSource

The FifoSource example, found in filelib.cpp in the SDK examples, demonstrates use of cancel() and isCanceled().

The FifoSource example, found in filelib.cpp in the SDK examples, demonstrates use of cancel() and isCanceled(). This source reads from a named pipe. Unlike reads from files, reads from pipes can block. Therefore, we need to be able to cancel a load from this source.

To manage cancellation, the UDx uses a pipe, a data channel used for inter-process communication. A process can write data to the write end of the pipe, and it remains available until another process reads it from the read end of the pipe. This example doesn't pass data through this pipe; rather, it uses the pipe to manage cancellation, as explained further below. In addition to the pipe's two file descriptors (one for each end), the UDx creates a file descriptor for the file to read from. The setup() function creates the pipe and then opens the file.

virtual void setup(ServerInterface &srvInterface) {
  // cancelPipe is a pipe used only for checking cancellation
  if (pipe(cancelPipe)) {
    vt_report_error(0, "Error opening control structure");
  }

  // handle to the named pipe from which we read data
  namedPipeFd = open(filename.c_str(), O_RDONLY | O_NONBLOCK);
  if (namedPipeFd < 0) {
    vt_report_error(0, "Error opening fifo [%s]", filename.c_str());
  }
}

We now have three file descriptors: namedPipeFd, cancelPipe[PIPE_READ], and cancelPipe[PIPE_WRITE]. Each of these must eventually be closed.

This UDx uses the poll() system call to wait either for data to arrive from the named pipe (namedPipeFd) or for a cancellation (cancelPipe[PIPE_READ]). The process() function polls, checks for results, checks for cancellation, writes output if needed, and returns.

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output) {
  struct pollfd pollfds[2] = {
    { namedPipeFd,           POLLIN, 0 },
    { cancelPipe[PIPE_READ], POLLIN, 0 }
  };

  if (poll(pollfds, 2, -1) < 0) {
    vt_report_error(1, "Error reading [%s]", filename.c_str());
  }

  if (pollfds[1].revents & (POLLIN | POLLHUP)) {
    /* This can only happen after cancel() has been called */
    VIAssert(isCanceled());
    return DONE;
  }

  VIAssert(pollfds[PIPE_READ].revents & (POLLIN | POLLHUP));

  const ssize_t amount = read(namedPipeFd, output.buf + output.offset, output.size - output.offset);
  if (amount < 0) {
    vt_report_error(1, "Error reading from fifo [%s]", filename.c_str());
  }

  if (amount == 0 || isCanceled()) {
    return DONE;
  } else {
    output.offset += amount;
    return OUTPUT_NEEDED;
  }
}

If the query is canceled, the cancel() function closes the write end of the pipe. The next time process() polls for input, it finds no input on the read end of the pipe and exits. Otherwise, it continues. The function also calls isCanceled() to check for cancellation before returning OUTPUT_NEEDED, the signal that it has filled its buffer and is waiting for it to be processed downstream.

The cancel() function does only the work needed to interrupt a call to process(). Cleanup that is always needed, not just for cancellation, is instead done in destroy() or the destructor. The cancel() function closes the write end of the pipe. (The helper function will be shown later.)


virtual void cancel(ServerInterface &srvInterface) {
  closeIfNeeded(cancelPipe[PIPE_WRITE]);
}

It is not safe to close the named pipe in cancel(), because closing it could create a race condition if another process (like another query) were to reuse the file descriptor number for a new descriptor before the UDx finishes. Instead we close it, and the read end of the pipe, in destroy().

virtual void destroy(ServerInterface &srvInterface) {
  closeIfNeeded(namedPipeFd);
  closeIfNeeded(cancelPipe[PIPE_READ]);
}

It is not safe to close the write end of the pipe in destroy(), because cancel() closes it and can be called concurrently with destroy(). Therefore, we close it in the destructor.


~FifoSource() {
  closeIfNeeded(cancelPipe[PIPE_WRITE]);
}

The UDx uses a helper function, closeIfNeeded(), to make sure each file descriptor is closed exactly once.

void closeIfNeeded(int &fd) {
  if (fd >= 0) {
    close(fd);
    fd = -1;
  }
}