我经常有一些应用程序,其中顶级函数的工作方式类似于
public Result5 ProcessAll() {
var result1 = Process1();
var result2 = Process2();
var result3 = Process3(result1);
var result4 = Process4(result1, result2);
return Process5(result1, result2, result3, result4);
}
流程* 函数的常见功能包括:
- IO 绑定(数据库、文件系统、Web 服务)
- 可能会引发刚刚在调用堆栈中传播的异常
- 可能会为一些非异常错误返回错误,这些错误应该停止处理并返回
顶级函数也在可以取消的后台线程上运行。这意味着完整的实现看起来像
public Result5 ProcessAll(CancellationToken cancellationToken) {
Result1 result1 = Process1();
if (result1 == null)
return null;
cancellationToken.ThrowIfCancellationRequested();
Result2 result2 = Process2();
if (result2 == null)
return null;
cancellationToken.ThrowIfCancellationRequested();
Result3 result3 = Process3(result1);
if (result3 == null)
return null;
cancellationToken.ThrowIfCancellationRequested();
Result4 result4 = Process4(result1, result2);
if (result4 == null)
return null;
cancellationToken.ThrowIfCancellationRequested();
return Process5(result1, result2, result3, result4);
}
现在让我们假设我需要通过尽可能多地并行运行来加快速度。
此外,假定 Process* 函数实现任务异步模式并使用 IO 完成端口或类似端口。
我无法为此找到任何好的模式。
如果我忽略错误/异常/取消,它将如下所示。
public Result5 ProcessAll(CancellationToken cancellationToken) {
Task<Result1> task1 = Process1Async();
Task<Result2> task2 = Process2Async();
Task<Result3> task3 = task1.ContinueWith(_ => Process3Async(task1.Result)).Unwrap();
Task<Result4> task4 = Task.Factory.ContinueWhenAll(new[] { task1, task2 },
_ => Process4Async(task1.Result, task2.Result)).Unwrap();
// This will trigger all exceptions captured
Task.WaitAll(new[] { task1, task2, task3, task4 });
return Process5(task1.Result, task2.Result, task3.Result, task4.Result);
}
(我知道这可以像运行 task4 同步一样进行优化,并且 WaitAll 不是必需的,但我只是在这里显示一种模式)
如果我现在尝试处理错误和异常,它可能如下所示:
public Result ProcessAll(CancellationToken cancellationToken) {
Task<Result1> task1 = Process1Async();
Task<Result2> task2 = Process2Async();
// Process 3 should not run if task1 or task2 failed or returned error
Task<Result3> task3 = task1.ContinueWith(_ => {
if (task1.IsFaulted || task1.Result == null)
return null;
if (task2.IsFaulted || (task2.IsCompleted && task2.Result == null)
return null;
return Process3Async(task1.Result);
}).Unwrap();
// Process4 should not start if any of Process1,Process2 or Process3 returned error or throw exception
Task<Result4> task4 = Task.Factory.ContinueWhenAll(new[] { task1, task2 }, _ => {
if (task1.Faulted || task1.Result == null)
return null;
if (task2.Faulted || task2.Result == null)
return null;
if (task3.Faulted || (task3.IsCompleted && task3.Result == null))
return null;
return Process4Async(task1.Result, task2.Result)).Unwrap();
Task.WaitAll(new[] { task1, task2, task3, task4 });
if (task1.Result == null ||
task2.Result == null ||
task3.Result == null ||
task4.Result == null)
return null;
return Process5(task1.Result, task2.Result, task3.Result, task4.Result);
}
现在我需要输入取消支票:-)
我现在的问题是:
在早期任务中,所有这些针对故障、错误和取消的检查都容易出错,并且可扩展性不强。我在这里错过了一些重要的东西并以错误的方式做吗?
我只能在所有早期进程完成后启动进程 3、4 和 5,反正你没有并行性。因此,您无需为它们使用任务。您只需要使用前两个任务,这使问题更容易。如果选择将它们作为任务启动,这些任务将始终等待前置任务,以这种方式删除任何并行性。