This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Handling cancel requests

Users of your UDx might cancel the operation while it is running.

Users of your UDx might cancel the operation while it is running. How Vertica handles the cancellation of the query and your UDx depends on whether your UDx is running in fenced or unfenced mode:

  • If your UDx is running in unfenced mode, Vertica either stops the function when it requests a new block of input or output, or waits until your function completes running and discards the results.

  • If your UDx is running in Fenced and unfenced modes, Vertica kills the zygote process that is running your function if it continues processing past a timeout.

In addition, you can implement the cancel() method in any UDx to perform any necessary additional work. Vertica calls your function when a query is canceled. This cancellation can occur at any time during your UDx's lifetime, from setup() through destroy().

You can check for cancellation before starting an expensive operation by calling isCanceled().

1 - Implementing the cancel callback

Your UDx can implement a cancel() callback function.

Your UDx can implement a cancel() callback function. Vertica calls this function if the query that invoked the UDx has been canceled.

You usually implement this function to perform an orderly shutdown of any additional processing that your UDx spawned. For example, you can have your cancel() function shut down threads that your UDx has spawned or signal a third-party library that it needs to stop processing and exit. Your cancel() function should leave your UDx's function class ready to be destroyed, because Vertica calls the UDx's destroy() function after the cancel() function has exited.

A UDx's default cancel() behavior is to do nothing.

The contract for cancel() is:

  • Vertica will call cancel() at most once per UDx instance.

  • Vertica can call cancel() concurrently with any other method of the UDx object except the constructor and destructor.

  • Vertica can call cancel() from another thread, so implementations should be thread-safe.

  • Vertica will call cancel() for either an explicit user cancellation or an error in the query.

  • Vertica does not guarantee that cancel() will run to completion. Long-running cancellations might be aborted.

The call to cancel() is not synchronized in any way with your UDx's other functions. If you need your processing function to exit before your cancel() function performs some action (killing threads, for example), you must have the two function synchronize their actions.

Vertica always calls destroy() if it called setup(). Cancellation does not prevent destruction.

See C++ example: cancelable UDSource for an example that implements cancel().

2 - Checking for cancellation during execution

You can call the isCanceled() method to check for user cancellation.

You can call the isCanceled() method to check for user cancellation. Typically you check for cancellation from the method that does the main processing in your UDx before beginning expensive operations. If isCanceled() returns true, the query has been canceled and your method should exit immediately to prevent it from wasting CPU time. If your UDx is not running fenced mode, Vertica cannot halt your function and has to wait for it to finish. If it is running in fenced mode, Vertica eventually kills the side process running it.

See C++ example: cancelable UDSource for an example that uses isCanceled().

3 - C++ example: cancelable UDSource

The FifoSource example, found in filelib.cpp in the SDK examples, demonstrates use of cancel() and isCanceled().

The FifoSource example, found in filelib.cpp in the SDK examples, demonstrates use of cancel() and isCanceled(). This source reads from a named pipe. Unlike reads from files, reads from pipes can block. Therefore, we need to be able to cancel a load from this source.

To manage cancellation, the UDx uses a pipe, a data channel used for inter-process communication. A process can write data to the write end of the pipe, and it remains available until another process reads it from the read end of the pipe. This example doesn't pass data through this pipe; rather, it uses the pipe to manage cancellation, as explained further below. In addition to the pipe's two file descriptors (one for each end), the UDx creates a file descriptor for the file to read from. The setup() function creates the pipe and then opens the file.

virtual void setup(ServerInterface &srvInterface) {
  // cancelPipe is a pipe used only for checking cancellation
  if (pipe(cancelPipe)) {
    vt_report_error(0, "Error opening control structure");
  }

  // handle to the named pipe from which we read data
  namedPipeFd = open(filename.c_str(), O_RDONLY | O_NONBLOCK);
  if (namedPipeFd < 0) {
    vt_report_error(0, "Error opening fifo [%s]", filename.c_str());
  }
}

We now have three file descriptors: namedPipeFd, cancelPipe[PIPE_READ], and cancelPipe[PIPE_WRITE]. Each of these must eventually be closed.

This UDx uses the poll() system call to wait either for data to arrive from the named pipe (namedPipeFd) or for a cancellation (cancelPipe[PIPE_READ]). The process() function polls, checks for results, checks for cancellation, writes output if needed, and returns.

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output) {
  struct pollfd pollfds[2] = {
    { namedPipeFd,           POLLIN, 0 },
    { cancelPipe[PIPE_READ], POLLIN, 0 }
  };

  if (poll(pollfds, 2, -1) < 0) {
    vt_report_error(1, "Error reading [%s]", filename.c_str());
  }

  if (pollfds[1].revents & (POLLIN | POLLHUP)) {
    /* This can only happen after cancel() has been called */
    VIAssert(isCanceled());
    return DONE;
  }

  VIAssert(pollfds[PIPE_READ].revents & (POLLIN | POLLHUP));

  const ssize_t amount = read(namedPipeFd, output.buf + output.offset, output.size - output.offset);
  if (amount < 0) {
    vt_report_error(1, "Error reading from fifo [%s]", filename.c_str());
  }

  if (amount == 0 || isCanceled()) {
    return DONE;
  } else {
    output.offset += amount;
    return OUTPUT_NEEDED;
  }
}

If the query is canceled, the cancel() function closes the write end of the pipe. The next time process() polls for input, it finds no input on the read end of the pipe and exits. Otherwise, it continues. The function also calls isCanceled() to check for cancellation before returning OUTPUT_NEEDED, the signal that it has filled its buffer and is waiting for it to be processed downstream.

The cancel() function does only the work needed to interrupt a call to process(). Cleanup that is always needed, not just for cancellation, is instead done in destroy() or the destructor. The cancel() function closes the write end of the pipe. (The helper function will be shown later.)


virtual void cancel(ServerInterface &srvInterface) {
  closeIfNeeded(cancelPipe[PIPE_WRITE]);
}

It is not safe to close the named pipe in cancel(), because closing it could create a race condition if another process (like another query) were to reuse the file descriptor number for a new descriptor before the UDx finishes. Instead we close it, and the read end of the pipe, in destroy().

virtual void destroy(ServerInterface &srvInterface) {
  closeIfNeeded(namedPipeFd);
  closeIfNeeded(cancelPipe[PIPE_READ]);
}

It is not safe to close the write end of the pipe in destroy(), because cancel() closes it and can be called concurrently with destroy(). Therefore, we close it in the destructor.


~FifoSource() {
  closeIfNeeded(cancelPipe[PIPE_WRITE]);
}

The UDx uses a helper function, closeIfNeeded(), to make sure each file descriptor is closed exactly once.

void closeIfNeeded(int &fd) {
  if (fd >= 0) {
    close(fd);
    fd = -1;
  }
}