Nesting await in Parallel.ForEach



在地铁应用程序中,我需要执行许多 WCF 调用。有大量的调用要进行,所以我需要在并行循环中执行它们。 问题是并行循环在 WCF 调用全部完成之前退出。

您将如何重构它以按预期工作?

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new  System.Collections.Concurrent.BlockingCollection<Customer>();
Parallel.ForEach(ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});
foreach ( var customer in customers )
{
    Console.WriteLine(customer.ID);
}
Console.ReadKey();

Parallel.ForEach()背后的整个思想是你有一组线程,每个线程处理集合的一部分。正如您所注意到的,这不适用于 async - await ,您希望在异步调用期间释放线程。

您可以通过阻止ForEach()线程来"修复"它,但这违背了async的全部意义 - await .

你可以做的是使用TPL数据流而不是Parallel.ForEach(),它很好地支持异步Task

具体来说,可以使用TransformBlock编写代码,该使用async lambda 将每个 id 转换为Customer。此块可以配置为并行执行。您可以将该块链接到将每个Customer写入控制台的ActionBlock。设置块网络后,可以将每个 id Post()TransformBlock

在代码中:

var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
    async i =>
    {
        ICustomerRepo repo = new CustomerRepo();
        return await repo.GetCustomer(i);
    }, new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    });
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
    writeCustomerBlock, new DataflowLinkOptions
    {
        PropagateCompletion = true
    });
foreach (var id in ids)
    getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();

尽管您可能希望将TransformBlock的并行性限制为一些小常量。此外,您可以限制TransformBlock的容量并使用 SendAsync() 异步添加项目,例如,如果集合太大。

与代码相比(如果它有效(的另一个好处是,一旦单个项目完成,编写就会开始,而不是等到所有处理完成。

Svick的回答(像往常一样(非常好。

但是,我发现当您实际有大量数据要传输时,数据流更有用。或者当您需要async兼容的队列时。

在您的情况下,更简单的解决方案是仅使用 async 样式的并行性:

var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customerTasks = ids.Select(i =>
  {
    ICustomerRepo repo = new CustomerRepo();
    return repo.GetCustomer(i);
  });
var customers = await Task.WhenAll(customerTasks);
foreach (var customer in customers)
{
  Console.WriteLine(customer.ID);
}
Console.ReadKey();

按照 svick 的建议使用 DataFlow 可能是矫枉过正的,斯蒂芬的回答并没有提供控制操作并发性的方法。但是,这可以相当简单地实现:

public static async Task RunWithMaxDegreeOfConcurrency<T>(
     int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
    var activeTasks = new List<Task>(maxDegreeOfConcurrency);
    foreach (var task in collection.Select(taskFactory))
    {
        activeTasks.Add(task);
        if (activeTasks.Count == maxDegreeOfConcurrency)
        {
            await Task.WhenAny(activeTasks.ToArray());
            //observe exceptions here
            activeTasks.RemoveAll(t => t.IsCompleted); 
        }
    }
    await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t => 
    {
        //observe exceptions in a manner consistent with the above   
    });
}

ToArray()调用可以通过使用数组而不是列表并替换已完成的任务来优化,但我怀疑在大多数情况下它会有很大的不同。每个OP问题的示例用法:

RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
});

编辑研究员SO用户和TPL专家Eli Arbel向我指出了Stephen Toub的一篇相关文章。像往常一样,他的实现既优雅又高效:

public static Task ForEachAsync<T>(
      this IEnumerable<T> source, int dop, Func<T, Task> body) 
{ 
    return Task.WhenAll( 
        from partition in Partitioner.Create(source).GetPartitions(dop) 
        select Task.Run(async delegate { 
            using (partition) 
                while (partition.MoveNext()) 
                    await body(partition.Current).ContinueWith(t => 
                          {
                              //observe exceptions
                          });
                      
        })); 
}

可以使用新的 AsyncEnumerator NuGet 包节省工作量,该包在 4 年前最初发布问题时并不存在。它允许您控制并行度:

using System.Collections.Async;
...
await ids.ParallelForEachAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    var cust = await repo.GetCustomer(i);
    customers.Add(cust);
},
maxDegreeOfParallelism: 10);

免责声明:我是AsyncEnumerator库的作者,该库是开源的,并在麻省理工学院下获得许可,我发布此消息只是为了帮助社区。

Parallel.Foreach包装到Task.Run()中,而不是使用 await 关键字,请使用 [yourasyncmethod].Result

(你需要做 Task.Run 的事情才能不阻塞 UI 线程(

像这样:

var yourForeachTask = Task.Run(() =>
        {
            Parallel.ForEach(ids, i =>
            {
                ICustomerRepo repo = new CustomerRepo();
                var cust = repo.GetCustomer(i).Result;
                customers.Add(cust);
            });
        });
await yourForeachTask;

这应该非常有效,并且比让整个TPL数据流工作更容易:

var customers = await ids.SelectAsync(async i =>
{
    ICustomerRepo repo = new CustomerRepo();
    return await repo.GetCustomer(i);
});
...
public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
    var results = new List<TResult>();
    var activeTasks = new HashSet<Task<TResult>>();
    foreach (var item in source)
    {
        activeTasks.Add(selector(item));
        if (activeTasks.Count >= maxDegreesOfParallelism)
        {
            var completed = await Task.WhenAny(activeTasks);
            activeTasks.Remove(completed);
            results.Add(completed.Result);
        }
    }
    results.AddRange(await Task.WhenAll(activeTasks));
    return results;
}
一种

扩展方法,它利用了SemaphoreSlim,并且还允许设置最大并行度:

/// <summary>Concurrently Executes async actions for each item of
/// <see cref="IEnumerable<typeparamref name="T"/></summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of
/// <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the
/// maximum degree of parallelism, Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel
/// is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
    this IEnumerable<T> enumerable,
    Func<T, Task> action,
    int? maxDegreeOfParallelism = null)
{
    if (maxDegreeOfParallelism.HasValue)
    {
        using (var semaphoreSlim = new SemaphoreSlim(
            maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
        {
            var tasksWithThrottler = new List<Task>();
            foreach (var item in enumerable)
            {
                // Increment the number of currently running tasks and wait if they
                // are more than limit.
                await semaphoreSlim.WaitAsync();
                tasksWithThrottler.Add(Task.Run(async () =>
                {
                    await action(item).ContinueWith(res =>
                    {
                        // action is completed, so decrement the number of
                        // currently running tasks
                        semaphoreSlim.Release();
                    }, TaskScheduler.Default);
                }));
            }
            // Wait for all tasks to complete.
            await Task.WhenAll(tasksWithThrottler.ToArray());
        }
    }
    else
    {
        await Task.WhenAll(enumerable.Select(item => action(item)));
    }
}
    

示例用法:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

我参加派对有点晚了,但您可能需要考虑使用 GetAwaiter.GetResult(( 在同步上下文中运行异步代码,但如下所示;

 Parallel.ForEach(ids, i =>
{
    ICustomerRepo repo = new CustomerRepo();
    // Run this in thread which Parallel library occupied.
    var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
    customers.Add(cust);
});

在引入了一堆帮助程序方法之后,您将能够使用以下简单语法运行并行查询:

const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
    .Split(DegreeOfParallelism)
    .SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
    .ConfigureAwait(false);

这里发生的事情是:我们将源集合分成 10 个块 ( .Split(DegreeOfParallelism) (,然后运行 10 个任务,每个任务一个接一个地处理其项目(.SelectManyAsync(...) (,并将它们合并回一个列表。

值得一提的是,有一种更简单的方法:

double[] result2 = await Enumerable.Range(0, 1000000)
    .Select(async i => await CalculateAsync(i).ConfigureAwait(false))
    .WhenAll()
    .ConfigureAwait(false);

但它需要采取预防措施:如果源集合太大,它将立即为每个项目安排Task,这可能会导致显著的性能下降。

上述示例中使用的扩展方法如下所示:

public static class CollectionExtensions
{
    /// <summary>
    /// Splits collection into number of collections of nearly equal size.
    /// </summary>
    public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
    {
        if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));
        List<T> source = src.ToList();
        var sourceIndex = 0;
        for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
        {
            var list = new List<T>();
            int itemsLeft = source.Count - targetIndex;
            while (slicesCount * list.Count < itemsLeft)
            {
                list.Add(source[sourceIndex++]);
            }
            yield return list;
        }
    }
    /// <summary>
    /// Takes collection of collections, projects those in parallel and merges results.
    /// </summary>
    public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
        this IEnumerable<IEnumerable<T>> source,
        Func<T, Task<TResult>> func)
    {
        List<TResult>[] slices = await source
            .Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
            .WhenAll()
            .ConfigureAwait(false);
        return slices.SelectMany(s => s);
    }
    /// <summary>Runs selector and awaits results.</summary>
    public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
    {
        List<TResult> result = new List<TResult>();
        foreach (TSource source1 in source)
        {
            TResult result1 = await selector(source1).ConfigureAwait(false);
            result.Add(result1);
        }
        return result;
    }
    /// <summary>Wraps tasks with Task.WhenAll.</summary>
    public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
    {
        return Task.WhenAll<TResult>(source);
    }
}
通过在

.NET 6 中引入 Parallel.ForEachAsync API 解决了并行化异步操作的问题,但使用较旧 .NET 平台的人可能仍然需要一个不错的替代品。实现一个的简单方法是使用 TPL 数据流库中的ActionBlock<T>组件。此库包含在标准 .NET 库(.NET Core 和 .NET 5+(中,并作为 .NET Framework 的 NuGet 包提供。以下是它的使用方法:

public static Task Parallel_ForEachAsync<T>(ICollection<T> source,
    int maxDegreeOfParallelism, Func<T, Task> action)
{
    var options = new ExecutionDataflowBlockOptions();
    options.MaxDegreeOfParallelism = maxDegreeOfParallelism;
    var block = new ActionBlock<T>(action, options);
    foreach (var item in source) block.Post(item);
    block.Complete();
    return block.Completion;
}

此解决方案仅适用于具体化source序列,因此参数的类型是ICollection<T>而不是更常见的IEnumerable<T>。它还具有忽略action抛出的任何OperationCanceledException的惊人行为。解决这些细微差别并尝试精确复制Parallel.ForEachAsync的功能和行为是可行的,但它需要的代码几乎与使用更原始的工具一样多。我已经在这个答案的第 9 次修订版中发布了这样的尝试。

<小时 />

下面是实现 Parallel.ForEachAsync 方法的不同尝试,提供与 .NET 6 API 完全相同的功能,并尽可能模仿其行为。它仅使用基本的 TPL 工具。这个想法是创建许多等于理想MaxDegreeOfParallelism的工作任务,每个任务以同步的方式枚举相同的枚举器。这类似于内部实现Parallel.ForEachAsync的方式。不同之处在于,.NET 6 API 从单个辅助角色开始,然后逐步添加更多工作线程,而下面的实现从一开始就创建所有辅助角色:

public static Task Parallel_ForEachAsync<T>(IEnumerable<T> source,
    ParallelOptions parallelOptions,
    Func<T, CancellationToken, Task> body)
{
    if (source == null) throw new ArgumentNullException("source");
    if (parallelOptions == null) throw new ArgumentNullException("parallelOptions");
    if (body == null) throw new ArgumentNullException("body");
    int dop = parallelOptions.MaxDegreeOfParallelism;
    if (dop < 0) dop = Environment.ProcessorCount;
    CancellationToken cancellationToken = parallelOptions.CancellationToken;
    TaskScheduler scheduler = parallelOptions.TaskScheduler ?? TaskScheduler.Current;
    IEnumerator<T> enumerator = source.GetEnumerator();
    CancellationTokenSource cts = CancellationTokenSource
        .CreateLinkedTokenSource(cancellationToken);
    var semaphore = new SemaphoreSlim(1, 1); // Synchronizes the enumeration
    var workerTasks = new Task[dop];
    for (int i = 0; i < dop; i++)
    {
        workerTasks[i] = Task.Factory.StartNew(async () =>
        {
            try
            {
                while (true)
                {
                    if (cts.IsCancellationRequested)
                    {
                        cancellationToken.ThrowIfCancellationRequested();
                        break;
                    }
                    T item;
                    await semaphore.WaitAsync(); // Continue on captured context.
                    try
                    {
                        if (!enumerator.MoveNext()) break;
                        item = enumerator.Current;
                    }
                    finally { semaphore.Release(); } 
                    await body(item, cts.Token); // Continue on captured context.
                }
            }
            catch { cts.Cancel(); throw; }
        }, CancellationToken.None, TaskCreationOptions.DenyChildAttach, scheduler)
            .Unwrap();
    }
    return Task.WhenAll(workerTasks).ContinueWith(t =>
    {
        // Clean up
        try { semaphore.Dispose(); cts.Dispose(); } finally { enumerator.Dispose(); }
        return t;
    }, CancellationToken.None, TaskContinuationOptions.DenyChildAttach |
        TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}

签名有所不同。body参数的类型为 Func<TSource, CancellationToken, Task> 而不是 Func<TSource, CancellationToken, ValueTask> 。这是因为值任务是一个相对较新的功能,在 .NET Framework 中不可用。

行为也有所不同。此实现通过完成取消来对body抛出的OperationCanceledException做出反应。正确的行为是将这些异常作为单个错误传播,并作为错误完成。修复这个小缺陷是可行的,但我宁愿不要使这个相对较短且可读的实现进一步复杂化。

没有TPL的简单原生方式:

int totalThreads = 0; int maxThreads = 3;
foreach (var item in YouList)
{
    while (totalThreads >= maxThreads) await Task.Delay(500);
    Interlocked.Increment(ref totalThreads);
    MyAsyncTask(item).ContinueWith((res) => Interlocked.Decrement(ref totalThreads));
}

您可以使用下一个任务检查此解决方案:

async static Task MyAsyncTask(string item)
{
    await Task.Delay(2500);
    Console.WriteLine(item);
}

最新更新