Stop Parallel.ForEachAsync



在C#中,我对停止Parallel.ForEachAsync循环感兴趣(考虑到StopBreak之间的差异(;对于Parallel.ForEach,我可以执行以下操作:

Parallel.ForEach(items, (item, state) =>
{
if (cancellationToken.IsCancellationRequested)
{
state.Stop();
return;
}
// some process on the item
Process(item);
});

但是,由于我有一个进程需要异步执行,所以我切换到了Parallel.ForEachAsyncForEachAsync没有Stop()方法,我可以按如下方式break循环,但我想知道这是否是打破循环的最有效方法(换句话说,循环在收到取消请求时需要尽快停止(。

await Parallel.ForEachAsync(items, async (item, state) =>
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
// some async process on the item
await ProcessAsync(item);
});

Parallel.ForEachAsyncbody委托的第二个参数是CancellationToken。此令牌由API提供,它与您在ParallelOptions中传递的令牌不同。您可以将此令牌转发到lambda内部调用的任何异步方法。如果您调用不可取消的方法,那么您能做的最好的事情就是在lambda:内的战略位置调用ThrowIfCancellationRequested

CancellationTokenSource cts = new();
ParallelOptions options = new() { CancellationToken = cts.Token };
try
{
await Parallel.ForEachAsync(items, options, async (item, ct) =>
{
//...
ct.ThrowIfCancellationRequested();
//...
await ProcessAsync(item, ct);
//...
ct.ThrowIfCancellationRequested();
//...
});
}
catch (OperationCanceledException ex)
{
// ...
}

在lambda中作为参数提供的令牌,即上例中的ct,不仅在ParallelOptions.CancellationToken被取消时,而且在ProcessAsync操作失败的情况下也会被取消。此机制允许更快地传播异常。当出现错误时,并行循环不会立即完成,因为它遵循禁止即发即弃操作的原则。所有由循环内部启动的操作都必须在整个循环成功或失败之前完成。lambda中的令牌可以将此延迟降至最低。

您将需要这样的东西:

await Parallel.ForEachAsync(items, async (item, state) =>
{
await ProcessAsync(item, cancellationToken);
});

async Task ProcessAsync(string item, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
//Process
}
}

最新更新