Rx.Net 目前还没有concatMap等效物,但考虑到可用的功能,肯定有一种方法可以获得类似的行为。我现在有observable.SelectMany(x => ProcessItemAsync(item).ToObservable())
ProcessItemAsync
是一个异步方法,我希望按顺序一个接一个地执行,而不是一次执行所有项目。
如果我正确理解 Rx,observable.ConcatMap(x => ProcessItemAsync(item).ToObservable())
应该这样做,但它目前在 Rx.Net 中不存在,那么实现相同行为的另一种方法是什么?
我可能有多个可观察量源,并且每个源都可以并行执行ProcessItemAsync
,它应该只是在流顺序内,以保留输入/输出顺序,所以我不能将其锁定在ProcessItemAsync
更新:
鉴于类似的问题 反应式扩展 选择许多 和 Concat 我举了一个例子:https://dotnetfiddle.net/cG3T2v - 使用 SemaphoreSlim
是我管理它工作的唯一方法。
(这里是示例中的代码(
using System;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
class Program
{
static readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
static async Task Main(string[] args)
{
Console.WriteLine("Kinda in order ...");
var kindaInOrder = Observable.Range(1, 4)
.Select(x => LongProcessAsync(x).ToObservable())
.Concat()
.Do(x => Console.WriteLine($"{x} concatenated"));
await kindaInOrder.RunAsync(CancellationToken.None);
Console.WriteLine();
Console.WriteLine("Completely in order ...");
var completelyInOrder = Observable.Range(1, 4)
.Select(x => OrderedLongProcessAsync(x).ToObservable())
.Concat()
.Do(x => Console.WriteLine($"{x} concatenated"));
await completelyInOrder.RunAsync(CancellationToken.None);
}
static async Task<int> LongProcessAsync(int n)
{
Console.WriteLine($"Job {n} started");
await Task.Delay(TimeSpan.FromSeconds(4 - n));
Console.WriteLine($"Job {n} done");
return n;
}
static async Task<int> OrderedLongProcessAsync(int n)
{
await semaphore.WaitAsync();
try
{
return await LongProcessAsync(n);
}
finally
{
semaphore.Release();
}
}
}
}
.Select()
和.Merge()
的组合来获得所描述的行为(仅在前一个完成时订阅每个内部生成的可观察对象(,并将maxConcurrent
参数设置为 1。
static IObservable<TU> ConcatMap<T, TU>(this IObservable<T> source, Func<T, IObservable<TU>> selector) {
return source.Select(selector).Merge(maxConcurrent: 1);
}
为了确保长时间运行的操作仅在前一个操作完成后启动,选择器必须返回"冷"可观察量:
source.ConcatMap(t => Observable.FromAsync(() => LongProcessAsync(t)))
您可以通过组合.Select()
和.Concat()
来获取ConcatMap的行为。
将下面粘贴到 LINQPad 进行尝试
public void Main()
{
// Generate 100 items and 'process' async
Observable
.Range(0, 100)
.Select(x => ProcessItemAsync(x))
.Concat()
.Dump();
}
private readonly Random _rnd = new Random();
public async Task<int> ProcessItemAsync(int item)
{
await Task.Delay(_rnd.Next(0, 1000));
return item;
}
代码启动 100 个需要不同时间才能完成的"作业",但您将看到结果的可观察量,以作业启动的顺序返回结果,而不是它们完成的顺序,如果您使用 .SelectMany()
(编辑:在原始帖子中,我制作了一个扩展方法,即选择/映射给定的异步 Func。然后我创建了一个类,其中等待、索引和存储结果,按顺序发出。但后来我在这里找到了一个更简单的解决方案:响应式扩展 SelectMany 和 Concat 的答案,这是 Select Concat 组合。
为了跟进我的评论,这里是建议的解决方案。
相反
.Select(x => LongProcessAsync(x).ToObservable())
使用以下内容:
.Select(x => Observable.Create<int>(async o =>
{
try
{
var result = await LongProcessAsync(x);
o.OnNext(result);
o.OnCompleted();
}
catch (Exception ex)
{
o.OnError(ex);
}
}))