这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

处理取消请求

UDx 的用户可能会在运行时取消相应操作。Vertica 如何处理查询和 UDx 的取消取决于 UDx 正在隔离模式还是非隔离模式下运行。

  • 如果 UDx 正在非隔离模式下运行,则 Vertica 会在函数请求新的输入块或输出块时停止该函数,或者会等到该函数运行完成并丢弃结果。

  • 如果 UDx 正在隔离和非隔离模式模式下运行,则当正在运行函数的 zygote 进程在超时后继续处理函数时,Vertica 会终止该进程。

此外,您还可以在任何 UDx 中实施 cancel() 方法来执行任何必要的额外工作。取消查询时,Vertica 会调用您的函数。在 Udx 的生命周期内(从 setup()destroy())的任何时间都可能发生这种取消。

通过调用 isCanceled(),您可以在开始执行代价高昂的操作之前检查是否已取消查询。

1 - 实施 Cancel 回调

UDx 可以实施 cancel() 回调函数。如果调用 UDx 的查询已被取消,Vertica 将调用此函数。

通常可以实施此函数以对 UDx 已生成的任何附加处理执行有序关闭。例如,您可以让 cancel() 函数关闭 UDx 已生成的线程,或者也可以让该函数向第三方库指示它需要停止处理并退出。cancel() 函数应使 UDx 的函数类准备好进行销毁,因为 Vertica 会在 cancel() 函数已退出之后调用 UDx 的 destroy() 函数。

UDx 的默认 cancel() 行为是什么都不做。

cancel() 的合约为:

  • 对于每个 UDx 实例,Vertica 最多会调用一次 cancel()

  • Vertica 可以与 UDx 对象的任何其他方法(构造函数和析构函数除外)同时调用 cancel()

  • Vertica 可以从另一个线程调用 cancel(),因此实施应当是线程安全的。

  • Vertica 将调用 cancel() 来处理明确的用户取消或查询错误。

  • Vertica 不保证 cancel() 将运行完成。长期取消可能会被中止。

cancel() 的调用不会以任何方式与 UDx 的其他函数同步。如果要求处理函数在 cancel() 函数执行某项操作(例如终止线程)之前退出,您必须让这两个函数同步其操作。

如果调用 setup(),则 Vertica 始终调用 destroy()。取消并不能防止破坏。

有关实施 cancel() 的示例,请参阅 C++ 示例:可取消的 UDSource

2 - 在执行期间检查是否已取消查询

您可以调用 isCanceled() 方法来检查用户是否已取消查询。通常,在开始执行代价高昂的操作之前,您会使用在 UDx 中进行主要处理的方法来检查是否已取消查询。如果 isCanceled() 返回 true,则表明查询已被取消,您的方法应立即退出以防止浪费 CPU 时间。如果 UDx 未在隔离模式下运行,则 Vertica 无法停止函数,并且必须等待函数完成。如果 UDx 在隔离模式下运行,Vertica 最终会终止运行它的从属进程。

有关使用 isCanceled() 的示例,请参阅 C++ 示例:可取消的 UDSource

3 - C++ 示例:可取消的 UDSource

在 SDK 示例的 filelib.cpp 中找到的 FifoSource 示例演示了如何使用 cancel()isCanceled()。此源从指定的管道执行读取操作。与从文件读取不同,从管道读取可能会堵塞。因此,我们需要能够取消从此源加载数据。

为了管理取消操作,UDx 使用了管道,它是一种用于进程间通信的数据通道。某个进程可以将数据写入管道的写入端,并在另一个进程从管道的读取端读取数据之前保持可用。此示例不通过该管道传递数据;相反,它使用该管道来管理取消操作,如下面进一步所述。除了管道的两个文件描述符(每端一个)之外,UDx 还会为要从其读取的文件创建文件描述符。setup() 函数将创建管道,然后打开相应文件。

virtual void setup(ServerInterface &srvInterface) {
  // cancelPipe is a pipe used only for checking cancellation
  if (pipe(cancelPipe)) {
    vt_report_error(0, "Error opening control structure");
  }

  // handle to the named pipe from which we read data
  namedPipeFd = open(filename.c_str(), O_RDONLY | O_NONBLOCK);
  if (namedPipeFd < 0) {
    vt_report_error(0, "Error opening fifo [%s]", filename.c_str());
  }
}

现在有三个文件描述符:namedPipeFdcancelPipe[PIPE_READ]cancelPipe[PIPE_WRITE]。上述每个描述符最终都必须关闭。

此 UDx 使用 poll() 系统调用来等待数据从指定的管道到达 (namedPipeFd) 或等待取消查询 (cancelPipe[PIPE_READ])。process() 函数将执行轮询、检查结果、检查是否已取消查询、在需要时写入输出,然后返回结果。

virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output) {
  struct pollfd pollfds[2] = {
    { namedPipeFd,           POLLIN, 0 },
    { cancelPipe[PIPE_READ], POLLIN, 0 }
  };

  if (poll(pollfds, 2, -1) < 0) {
    vt_report_error(1, "Error reading [%s]", filename.c_str());
  }

  if (pollfds[1].revents & (POLLIN | POLLHUP)) {
    /* This can only happen after cancel() has been called */
    VIAssert(isCanceled());
    return DONE;
  }

  VIAssert(pollfds[PIPE_READ].revents & (POLLIN | POLLHUP));

  const ssize_t amount = read(namedPipeFd, output.buf + output.offset, output.size - output.offset);
  if (amount < 0) {
    vt_report_error(1, "Error reading from fifo [%s]", filename.c_str());
  }

  if (amount == 0 || isCanceled()) {
    return DONE;
  } else {
    output.offset += amount;
    return OUTPUT_NEEDED;
  }
}

如果查询被取消,则 cancel() 函数会关闭管道的写入端。process() 下一次轮询输入时,它会在管道的读取端找不到输入时退出。否则,它会继续操作。此外,该函数还会调用 isCanceled() 以在返回 OUTPUT_NEEDED(表示已填满缓冲区且正在等待下游处理的信号)之前检查是否已取消查询。

cancel() 函数仅执行中断对 process() 的调用所需的工作。相反,始终需要执行(而不仅仅是为了取消查询)的清理是在 destroy() 或析构函数中完成的。cancel() 函数会关闭管道的写入端。(稍后将显示 helper 函数。)


virtual void cancel(ServerInterface &srvInterface) {
  closeIfNeeded(cancelPipe[PIPE_WRITE]);
}

cancel() 中关闭指定的管道并不安全,因为如果另一个进程(如另一个查询)要在 UDx 完成之前将文件描述符编号重用于新描述符,则关闭指定的管道可能会产生竞争条件。我们会改为在 destroy() 中关闭指定的管道以及管道的读取端。

virtual void destroy(ServerInterface &srvInterface) {
  closeIfNeeded(namedPipeFd);
  closeIfNeeded(cancelPipe[PIPE_READ]);
}

destroy() 中关闭管道的写入端并不安全,因为 cancel() 会关闭它且可以使用 destroy() 进行并发调用。因此,我们在析构函数中关闭管道的写入端。


~FifoSource() {
  closeIfNeeded(cancelPipe[PIPE_WRITE]);
}

Udx 会使用 helper 函数 closeIfNeeded() 来确保每个文件描述符正好关闭一次。

void closeIfNeeded(int &fd) {
  if (fd >= 0) {
    close(fd);
    fd = -1;
  }
}