请观察以下代码片段:
var result = await GetSource(1000).SelectMany(s => getResultAsync(s).ToObservable()).ToList();
此代码的问题在于getResultAsync
以不受约束的方式并发运行。在某些情况下,这可能不是我们想要的。假设我想将其并发性限制为最多 10 个并发调用。Rx.NET 方法是什么?
我附上一个简单的控制台应用程序,它演示了所描述问题的主题和我的蹩脚解决方案。
还有一些额外的代码,比如Stats
类和人工随机睡眠。他们在那里确保我真正获得并发执行,并且可以可靠地计算在此过程中达到的最大并发性。
该方法RunUnconstrained
演示了朴素、不受约束的运行。RunConstrained
的方法显示了我的解决方案,这不是很优雅。理想情况下,我想通过简单地将专用的 Rx 运算符应用于 Monad 来缓解并发约束。当然,在不牺牲性能的情况下。
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace RxConstrainedConcurrency
{
class Program
{
public class Stats
{
public int MaxConcurrentCount;
public int CurConcurrentCount;
public readonly object MaxConcurrentCountGuard = new object();
}
static void Main()
{
RunUnconstrained().GetAwaiter().GetResult();
RunConstrained().GetAwaiter().GetResult();
}
static async Task RunUnconstrained()
{
await Run(AsyncOp);
}
static async Task RunConstrained()
{
using (var sem = new SemaphoreSlim(10))
{
await Run(async (s, pause, stats) =>
{
// ReSharper disable AccessToDisposedClosure
await sem.WaitAsync();
try
{
return await AsyncOp(s, pause, stats);
}
finally
{
sem.Release();
}
// ReSharper restore AccessToDisposedClosure
});
}
}
static async Task Run(Func<string, int, Stats, Task<int>> getResultAsync)
{
var stats = new Stats();
var rnd = new Random(0x1234);
var result = await GetSource(1000).SelectMany(s => getResultAsync(s, rnd.Next(30), stats).ToObservable()).ToList();
Debug.Assert(stats.CurConcurrentCount == 0);
Debug.Assert(result.Count == 1000);
Debug.Assert(!result.Contains(0));
Debug.WriteLine("Max concurrency = " + stats.MaxConcurrentCount);
}
static IObservable<string> GetSource(int count)
{
return Enumerable.Range(1, count).Select(i => i.ToString()).ToObservable();
}
static Task<int> AsyncOp(string s, int pause, Stats stats)
{
return Task.Run(() =>
{
int cur = Interlocked.Increment(ref stats.CurConcurrentCount);
if (stats.MaxConcurrentCount < cur)
{
lock (stats.MaxConcurrentCountGuard)
{
if (stats.MaxConcurrentCount < cur)
{
stats.MaxConcurrentCount = cur;
}
}
}
try
{
Thread.Sleep(pause);
return int.Parse(s);
}
finally
{
Interlocked.Decrement(ref stats.CurConcurrentCount);
}
});
}
}
}
您可以在 Rx 中使用 Merge
的重载来限制内部可观察量的并发订阅数。
这种形式的Merge
应用于流流。
通常,使用 SelectMany
从事件调用异步任务执行两项工作:它将每个事件投影到一个可观察的流中,该流是其单个事件是结果,并将所有生成的流平展在一起。
要使用Merge
我们必须使用常规Select
将每个事件投影到异步任务的调用中(从而创建流流(,并使用Merge
来扁平化结果。它将以受约束的方式执行此操作,只需在任何时间点订阅提供的固定数量的内部流。
我们必须小心,只在订阅包装内部流时调用每个异步任务调用。将异步任务转换为具有ToObservable()
的可观察量实际上会立即调用异步任务,而不是在订阅时调用,因此我们必须将评估推迟到使用 Observable.Defer
订阅。
下面是将所有这些步骤放在一起的示例:
void Main()
{
var xs = Observable.Range(0, 10); // source events
// "Double" here is our async operation to be constrained,
// in this case to 3 concurrent invocations
xs.Select(x =>
Observable.Defer(() => Double(x).ToObservable())).Merge(3)
.Subscribe(Console.WriteLine,
() => Console.WriteLine("Max: " + MaxConcurrent));
}
private static int Concurrent;
private static int MaxConcurrent;
private static readonly object gate = new Object();
public async Task<int> Double(int x)
{
var concurrent = Interlocked.Increment(ref Concurrent);
lock(gate)
{
MaxConcurrent = Math.Max(concurrent, MaxConcurrent);
}
await Task.Delay(TimeSpan.FromSeconds(1));
Interlocked.Decrement(ref Concurrent);
return x * 2;
}
此处的最大并发输出将为"3"。删除合并以"不受约束",您将获得"10"。
另一种(等效的(获得读起来更好的Defer
效果的方法是使用 FromAsync
而不是 Defer
+ ToObservable
:
xs.Select(x => Observable.FromAsync(() => Double(x))).Merge(3)