这是本节的多页打印视图。
点击此处打印.
返回本页常规视图.
处理取消请求
UDx 的用户可能会在运行时取消相应操作。Vertica 如何处理查询和 UDx 的取消取决于 UDx 正在隔离模式还是非隔离模式下运行。
此外,您还可以在任何 UDx 中实施 cancel()
方法来执行任何必要的额外工作。取消查询时,Vertica 会调用您的函数。在 Udx 的生命周期内(从 setup()
到 destroy()
)的任何时间都可能发生这种取消。
通过调用 isCanceled()
,您可以在开始执行代价高昂的操作之前检查是否已取消查询。
1 - 实施 Cancel 回调
UDx 可以实施 cancel()
回调函数。如果调用 UDx 的查询已被取消,Vertica 将调用此函数。
通常可以实施此函数以对 UDx 已生成的任何附加处理执行有序关闭。例如,您可以让 cancel()
函数关闭 UDx 已生成的线程,或者也可以让该函数向第三方库指示它需要停止处理并退出。cancel()
函数应使 UDx 的函数类准备好进行销毁,因为 Vertica 会在 cancel()
函数已退出之后调用 UDx 的 destroy()
函数。
UDx 的默认 cancel()
行为是什么都不做。
cancel()
的合约为:
-
对于每个 UDx 实例,Vertica 最多会调用一次 cancel()
。
-
Vertica 可以与 UDx 对象的任何其他方法(构造函数和析构函数除外)同时调用 cancel()
。
-
Vertica 可以从另一个线程调用 cancel()
,因此实施应当是线程安全的。
-
Vertica 将调用 cancel()
来处理明确的用户取消或查询错误。
-
Vertica 不保证 cancel()
将运行完成。长期取消可能会被中止。
对 cancel()
的调用不会以任何方式与 UDx 的其他函数同步。如果要求处理函数在 cancel()
函数执行某项操作(例如终止线程)之前退出,您必须让这两个函数同步其操作。
如果调用 setup()
,则 Vertica 始终调用 destroy()
。取消并不能防止破坏。
有关实施 cancel()
的示例,请参阅 C++ 示例:可取消的 UDSource。
2 - 在执行期间检查是否已取消查询
您可以调用 isCanceled()
方法来检查用户是否已取消查询。通常,在开始执行代价高昂的操作之前,您会使用在 UDx 中进行主要处理的方法来检查是否已取消查询。如果 isCanceled()
返回 true,则表明查询已被取消,您的方法应立即退出以防止浪费 CPU 时间。如果 UDx 未在隔离模式下运行,则 Vertica 无法停止函数,并且必须等待函数完成。如果 UDx 在隔离模式下运行,Vertica 最终会终止运行它的从属进程。
有关使用 isCanceled()
的示例,请参阅 C++ 示例:可取消的 UDSource。
3 - C++ 示例:可取消的 UDSource
在 SDK 示例的 filelib.cpp
中找到的 FifoSource
示例演示了如何使用 cancel()
和 isCanceled()
。此源从指定的管道执行读取操作。与从文件读取不同,从管道读取可能会堵塞。因此,我们需要能够取消从此源加载数据。
为了管理取消操作,UDx 使用了管道,它是一种用于进程间通信的数据通道。某个进程可以将数据写入管道的写入端,并在另一个进程从管道的读取端读取数据之前保持可用。此示例不通过该管道传递数据;相反,它使用该管道来管理取消操作,如下面进一步所述。除了管道的两个文件描述符(每端一个)之外,UDx 还会为要从其读取的文件创建文件描述符。setup()
函数将创建管道,然后打开相应文件。
virtual void setup(ServerInterface &srvInterface) {
// cancelPipe is a pipe used only for checking cancellation
if (pipe(cancelPipe)) {
vt_report_error(0, "Error opening control structure");
}
// handle to the named pipe from which we read data
namedPipeFd = open(filename.c_str(), O_RDONLY | O_NONBLOCK);
if (namedPipeFd < 0) {
vt_report_error(0, "Error opening fifo [%s]", filename.c_str());
}
}
现在有三个文件描述符:namedPipeFd
、cancelPipe[PIPE_READ]
和 cancelPipe[PIPE_WRITE]
。上述每个描述符最终都必须关闭。
此 UDx 使用 poll()
系统调用来等待数据从指定的管道到达 (namedPipeFd
) 或等待取消查询 (cancelPipe[PIPE_READ]
)。process()
函数将执行轮询、检查结果、检查是否已取消查询、在需要时写入输出,然后返回结果。
virtual StreamState process(ServerInterface &srvInterface, DataBuffer &output) {
struct pollfd pollfds[2] = {
{ namedPipeFd, POLLIN, 0 },
{ cancelPipe[PIPE_READ], POLLIN, 0 }
};
if (poll(pollfds, 2, -1) < 0) {
vt_report_error(1, "Error reading [%s]", filename.c_str());
}
if (pollfds[1].revents & (POLLIN | POLLHUP)) {
/* This can only happen after cancel() has been called */
VIAssert(isCanceled());
return DONE;
}
VIAssert(pollfds[PIPE_READ].revents & (POLLIN | POLLHUP));
const ssize_t amount = read(namedPipeFd, output.buf + output.offset, output.size - output.offset);
if (amount < 0) {
vt_report_error(1, "Error reading from fifo [%s]", filename.c_str());
}
if (amount == 0 || isCanceled()) {
return DONE;
} else {
output.offset += amount;
return OUTPUT_NEEDED;
}
}
如果查询被取消,则 cancel()
函数会关闭管道的写入端。process()
下一次轮询输入时,它会在管道的读取端找不到输入时退出。否则,它会继续操作。此外,该函数还会调用 isCanceled()
以在返回 OUTPUT_NEEDED
(表示已填满缓冲区且正在等待下游处理的信号)之前检查是否已取消查询。
cancel()
函数仅执行中断对 process()
的调用所需的工作。相反,始终需要执行(而不仅仅是为了取消查询)的清理是在 destroy()
或析构函数中完成的。cancel()
函数会关闭管道的写入端。(稍后将显示 helper 函数。)
virtual void cancel(ServerInterface &srvInterface) {
closeIfNeeded(cancelPipe[PIPE_WRITE]);
}
在 cancel()
中关闭指定的管道并不安全,因为如果另一个进程(如另一个查询)要在 UDx 完成之前将文件描述符编号重用于新描述符,则关闭指定的管道可能会产生竞争条件。我们会改为在 destroy()
中关闭指定的管道以及管道的读取端。
virtual void destroy(ServerInterface &srvInterface) {
closeIfNeeded(namedPipeFd);
closeIfNeeded(cancelPipe[PIPE_READ]);
}
在 destroy()
中关闭管道的写入端并不安全,因为 cancel()
会关闭它且可以使用 destroy()
进行并发调用。因此,我们在析构函数中关闭管道的写入端。
~FifoSource() {
closeIfNeeded(cancelPipe[PIPE_WRITE]);
}
Udx 会使用 helper 函数 closeIfNeeded()
来确保每个文件描述符正好关闭一次。
void closeIfNeeded(int &fd) {
if (fd >= 0) {
close(fd);
fd = -1;
}
}