如何在 Rx.NET 中以正确的方式约束并发



请观察以下代码片段:

var result = await GetSource(1000).SelectMany(s => getResultAsync(s).ToObservable()).ToList();

此代码的问题在于getResultAsync以不受约束的方式并发运行。在某些情况下,这可能不是我们想要的。假设我想将其并发性限制为最多 10 个并发调用。Rx.NET 方法是什么?

我附上一个简单的控制台应用程序,它演示了所描述问题的主题和我的蹩脚解决方案。

还有一些额外的代码,比如Stats类和人工随机睡眠。他们在那里确保我真正获得并发执行,并且可以可靠地计算在此过程中达到的最大并发性。

该方法RunUnconstrained演示了朴素、不受约束的运行。RunConstrained的方法显示了我的解决方案,这不是很优雅。理想情况下,我想通过简单地将专用的 Rx 运算符应用于 Monad 来缓解并发约束。当然,在不牺牲性能的情况下。

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace RxConstrainedConcurrency
{
    class Program
    {
        public class Stats
        {
            public int MaxConcurrentCount;
            public int CurConcurrentCount;
            public readonly object MaxConcurrentCountGuard = new object();
        }
        static void Main()
        {
            RunUnconstrained().GetAwaiter().GetResult();
            RunConstrained().GetAwaiter().GetResult();
        }
        static async Task RunUnconstrained()
        {
            await Run(AsyncOp);
        }
        static async Task RunConstrained()
        {
            using (var sem = new SemaphoreSlim(10))
            {
                await Run(async (s, pause, stats) =>
                {
                    // ReSharper disable AccessToDisposedClosure
                    await sem.WaitAsync();
                    try
                    {
                        return await AsyncOp(s, pause, stats);
                    }
                    finally
                    {
                        sem.Release();
                    }
                    // ReSharper restore AccessToDisposedClosure
                });
            }
        }
        static async Task Run(Func<string, int, Stats, Task<int>> getResultAsync)
        {
            var stats = new Stats();
            var rnd = new Random(0x1234);
            var result = await GetSource(1000).SelectMany(s => getResultAsync(s, rnd.Next(30), stats).ToObservable()).ToList();
            Debug.Assert(stats.CurConcurrentCount == 0);
            Debug.Assert(result.Count == 1000);
            Debug.Assert(!result.Contains(0));
            Debug.WriteLine("Max concurrency = " + stats.MaxConcurrentCount);
        }
        static IObservable<string> GetSource(int count)
        {
            return Enumerable.Range(1, count).Select(i => i.ToString()).ToObservable();
        }
        static Task<int> AsyncOp(string s, int pause, Stats stats)
        {
            return Task.Run(() =>
            {
                int cur = Interlocked.Increment(ref stats.CurConcurrentCount);
                if (stats.MaxConcurrentCount < cur)
                {
                    lock (stats.MaxConcurrentCountGuard)
                    {
                        if (stats.MaxConcurrentCount < cur)
                        {
                            stats.MaxConcurrentCount = cur;
                        }
                    }
                }
                try
                {
                    Thread.Sleep(pause);
                    return int.Parse(s);
                }
                finally
                {
                    Interlocked.Decrement(ref stats.CurConcurrentCount);
                }
            });
        }
    }
}

您可以在 Rx 中使用 Merge 的重载来限制内部可观察量的并发订阅数。

这种形式的Merge应用于流流。

通常,使用 SelectMany 从事件调用异步任务执行两项工作:它将每个事件投影到一个可观察的流中,该流是其单个事件是结果,并将所有生成的流平展在一起。

要使用Merge我们必须使用常规Select将每个事件投影到异步任务的调用中(从而创建流流(,并使用Merge来扁平化结果。它将以受约束的方式执行此操作,只需在任何时间点订阅提供的固定数量的内部流。

我们必须小心,只在订阅包装内部流时调用每个异步任务调用。将异步任务转换为具有ToObservable()的可观察量实际上会立即调用异步任务,而不是在订阅时调用,因此我们必须将评估推迟到使用 Observable.Defer 订阅。

下面是将所有这些步骤放在一起的示例:

void Main()
{
    var xs = Observable.Range(0, 10); // source events
    // "Double" here is our async operation to be constrained,
    // in this case to 3 concurrent invocations
    xs.Select(x =>
       Observable.Defer(() => Double(x).ToObservable())).Merge(3)
      .Subscribe(Console.WriteLine,
                 () => Console.WriteLine("Max: " + MaxConcurrent));

}
private static int Concurrent;
private static int MaxConcurrent;
private static readonly object gate = new Object();
public async Task<int> Double(int x)
{
    var concurrent = Interlocked.Increment(ref Concurrent);
    lock(gate)
    {
        MaxConcurrent = Math.Max(concurrent, MaxConcurrent);
    }
    await Task.Delay(TimeSpan.FromSeconds(1));
    Interlocked.Decrement(ref Concurrent);
    return x * 2;
}

此处的最大并发输出将为"3"。删除合并以"不受约束",您将获得"10"。

另一种(等效的(获得读起来更好的Defer效果的方法是使用 FromAsync 而不是 Defer + ToObservable

xs.Select(x => Observable.FromAsync(() => Double(x))).Merge(3)

相关内容

  • 没有找到相关文章

最新更新