构造可取消的 Java 任务的 DAG



我想在Java中创建一个DAG,其中任务可能依赖于其他任务的输出。如果两个任务之间没有定向路径,则它们可以并行运行。任务可能会被取消。如果任何任务引发异常,则取消所有任务。

我想为此使用CompleteableFuture,但尽管实现了Future接口(包括Future.cancel(boolean)CompletableFuture不支持取消 -CompletableFuture.cancel(true)被简单地忽略了。(有人知道为什么吗?

因此,我正在使用Future构建自己的任务DAG。这是很多样板,而且很难正确处理。还有比这更好的方法吗?

下面是一个示例:

  1. 我想调用Process process = Runtime.getRuntime().exec(cmd)来启动命令行进程,创建一个Future<Process>。然后我想启动(扇出)三个子任务:
    • 一个使用来自process.getInputStream()输入的任务
    • 一个使用来自process.getErrorStream()输入的任务
    • 一个调用process.waitFor()的任务,然后等待结果。
  2. 然后我想等待所有三个启动的子任务完成(即扇入/完成障碍)。这应该在收集process.waitFor()任务返回的退出代码的最终Future<Integer> exitCode中收集。两个输入的消费者任务只是返回Void,所以它们的输出可以忽略,但完成障碍仍应等待它们的完成。

我希望任何启动的子任务失败,导致所有子任务被取消,底层进程被破坏。

请注意,第一步中的Process process = Runtime.getRuntime().exec(cmd)可能会引发异常,这应该会导致失败一直级联到exitCode

@FunctionalInterface
public static interface ConsumerThrowingIOException<T> {
public void accept(T val) throws IOException;
}
public static Future<Integer> exec(
ConsumerThrowingIOException<InputStream> stdoutConsumer,
ConsumerThrowingIOException<InputStream> stderrConsumer,
String... cmd) {
Future<Process> processFuture = executor.submit(
() -> Runtime.getRuntime().exec(cmd));
AtomicReference<Future<Void>> stdoutProcessorFuture = //
new AtomicReference<>();
AtomicReference<Future<Void>> stderrProcessorFuture = //
new AtomicReference<>();
AtomicReference<Future<Integer>> exitCodeFuture = //
new AtomicReference<>();
Runnable cancel = () -> {
try {
processFuture.get().destroy();
} catch (Exception e) {
// Ignore (exitCodeFuture.get() will still detect exceptions)
}
if (stdoutProcessorFuture.get() != null) {
stdoutProcessorFuture.get().cancel(true);
}
if (stderrProcessorFuture.get() != null) {
stderrProcessorFuture.get().cancel(true);
}
if (exitCodeFuture.get() != null) {
stderrProcessorFuture.get().cancel(true);
}
};
if (stdoutConsumer != null) {
stdoutProcessorFuture.set(executor.submit(() -> {
try {
InputStream inputStream = processFuture.get()
.getInputStream();
stdoutConsumer.accept(inputStream != null
? inputStream
: new ByteArrayInputStream(new byte[0]));
return null;
} catch (Exception e) {
cancel.run();
throw e;
}
}));
}
if (stderrConsumer != null) {
stderrProcessorFuture.set(executor.submit(() -> {
try {
InputStream errorStream = processFuture.get()
.getErrorStream();
stderrConsumer.accept(errorStream != null
? errorStream
: new ByteArrayInputStream(new byte[0]));
return null;
} catch (Exception e) {
cancel.run();
throw e;
}
}));
}
exitCodeFuture.set(executor.submit(() -> {
try {
return processFuture.get().waitFor();
} catch (Exception e) {
cancel.run();
throw e;
}
}));
// Async completion barrier -- wait for process to exit,
// and for output processors to complete
return executor.submit(() -> {
Exception exception = null;
int exitCode = 1;
try {
exitCode = exitCodeFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
exception = e;
}
if (stderrProcessorFuture.get() != null) {
try {
stderrProcessorFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
if (exception == null) {
exception = e;
} else if (e instanceof ExecutionException) {
exception.addSuppressed(e);
}
}
}
if (stdoutProcessorFuture.get() != null) {
try {
stdoutProcessorFuture.get().get();
} catch (InterruptedException | CancellationException
| ExecutionException e) {
cancel.run();
if (exception == null) {
exception = e;
} else if (e instanceof ExecutionException) {
exception.addSuppressed(e);
}
}
}
if (exception != null) {
throw exception;
} else {
return exitCode;
}
});
}

注意:我意识到Runtime.getRuntime().exec(cmd)应该是非阻塞的,所以不需要它自己的Future,但我还是用一个代码编写了代码,以说明DAG构造的观点。

没办法。Process 没有异步接口(Process.onExit() 除外)。因此,您必须使用线程来等待进程创建和从输入流读取。DAG 的其他组件可以是异步任务 (CompletableFutures)。

这不是一个大问题。与线程相比,异步任务的唯一优点是内存消耗更少。无论如何,您的进程会消耗大量内存,因此在这里节省内存没有多大意义。

最新更新