如何将异步任务与取消令牌链接起来?



我想链接一些任务,但如果CancellationToken没有被触发,则有条件地继续执行。

我的目标是实现的等同于

var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
var t = Task.Run(async () => {
if (cancellationToken.IsCancellationRequested) return;
await t1();
if (cancellationToken.IsCancellationRequested) return;
await t2();
if (cancellationToken.IsCancellationRequested) return;
await t3();
if (cancellationToken.IsCancellationRequested) return;
await t4();
});
var timeout = Task.Delay(TimeSpan.FromSeconds(4));
var completedTask = await Task.WhenAny(t, timeout);
if (completedTask != t)
{
cts.Cancel();
await t;
}

这就是我现在所拥有的,它正在工作,尽管它也很冗长。

var cts = new CancellationTokenSource();
var t = Task.Run(async () => {
await t1();
await t2();
await t3();
await t4();
}, cts.Token);
cts.CancelAfter(TimeSpan.FromSeconds(4));
try
{
await t;
}
catch (OperationCanceledException)
{
// The cancellation token was triggered, add logic for that
....
}

你的原始代码是正确的——它假设你总是希望各个任务运行完成,如果取消,你希望整个任务成功完成。 这些都不是惯用语。

一种更正常的方法是:

var cts = new CancellationTokenSource();
var cancellationToken = cts.Token;
var t = Task.Run(async () => {
cancellationToken.ThrowIfCancellationRequested();
await t1(cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
await t2(cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
await t3(cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
await t4(cancellationToken);
}, cancellationToken);

然后,在其他地方:

cts.Cancel();

您可以在此处省略对ThrowIfCancellationRequested的调用,假设各个任务在进入后不久就会检查它,但核心思想是您应该将令牌传递到正在执行工作的任何内容的最内部循环,并且它们应该通过调用它来取消,这最终将任务设置为取消状态而不是成功状态。

(所以实际上,如果你点击了一个不接受CancellationToken参数的函数,你只需要实际调用ThrowIfCancellationRequested- 这就是为什么所有异步方法都应该这样做,否则它的任务将是不可取消的。

如果整个操作花费的时间超过 4 秒,您的目标似乎是停止执行。

如果你把CancellationToken传递给你的t1/t2/等方法,我会说你不能比你所拥有的做得更好。但是,正如您拥有的那样,您可以只使用Stopwatch而不是CancellationToken

var timeout = TimeSpan.FromSeconds(4);
var stopwatch = new Stopwatch();
stopwatch.Start();
await t1();
if (stopwatch.Elapsed > timeout) return;
await t2();
if (stopwatch.Elapsed > timeout) return;
await t3();
if (stopwatch.Elapsed > timeout) return;
await t4();
stopwatch.Stop();

我假设这是在某个地方的方法中,您可以在其中使用return,但您可以根据需要进行修改(返回值、抛出异常等(。

您的代码在功能上是正常的,但是当一目了然地阅读它时,不清楚它在做什么。因此,我建议您将此逻辑封装在具有描述性名称和参数的实用程序方法中:

public static async Task RunSequentially(IEnumerable<Func<Task>> taskFactories,
int timeout = Timeout.Infinite, bool onTimeoutAwaitIncompleteTask = false)
{
using (var cts = new CancellationTokenSource(timeout))
{
if (onTimeoutAwaitIncompleteTask)
{
await Task.Run(async () =>
{
foreach (var taskFactory in taskFactories)
{
if (cts.IsCancellationRequested) throw new TimeoutException();
await taskFactory();
}
});
}
else // On timeout return immediately
{
var allSequentially = Task.Run(async () =>
{
foreach (var taskFactory in taskFactories)
{
cts.Token.ThrowIfCancellationRequested();
var task = taskFactory(); // Synchronous part of task
cts.Token.ThrowIfCancellationRequested();
await task; // Asynchronous part of task
}
}, cts.Token);
var timeoutTask = new Task(() => {}, cts.Token);
var completedTask = await Task.WhenAny(allSequentially, timeoutTask);
if (completedTask.IsCanceled) throw new TimeoutException();
await completedTask; // Propagate any exception
}
}
}

此代码延迟于您的代码,因为它会在超时时引发TimeoutException。我认为最好强制调用方显式处理此异常,而不是隐藏操作超时的事实。调用方可以通过将 catch 块留空来忽略异常:

try
{
await RunSequentially(new[] { t1, t2, t3, t4 },
timeout: 4000,
onTimeoutAwaitIncompleteTask: true);
}
catch (TimeoutException)
{
// Do nothing
}

你应该考虑使用 Microsoft 的反应式框架(又名 Rx( - NuGetSystem.Reactive并添加using System.Reactive.Linq;- 然后你可以这样做:

IObservable<Unit> tasks =
from x1 in Observable.FromAsync(() => t1())
from x2 in Observable.FromAsync(() => t2())
from x3 in Observable.FromAsync(() => t3())
from x4 in Observable.FromAsync(() => t4())
select Unit.Default;
IObservable<Unit> timer =
Observable
.Timer(TimeSpan.FromSeconds(4.0))
.Select(x => Unit.Default)
IDisposable subscription =
Observable
.Amb(tasks, timer)
.Subscribe();

如果计时器可观察对象在任务完成之前触发,则取消整个管道。不会运行不必要的任务。

如果要手动取消,只需致电subscription.Dispose()

代码也简单美观。

既然这里没有人给出更好的答案,我就自己做。

在我看来,我在问题中提供的代码是最有凝聚力的,也足够通用,可以在其他情况下回收。

最新更新