如何在 Rx.Net 中定义ConcatMap



Rx.Net 目前还没有concatMap等效物,但考虑到可用的功能,肯定有一种方法可以获得类似的行为。我现在有observable.SelectMany(x => ProcessItemAsync(item).ToObservable()) ProcessItemAsync是一个异步方法,我希望按顺序一个接一个地执行,而不是一次执行所有项目。

如果我正确理解 Rx,observable.ConcatMap(x => ProcessItemAsync(item).ToObservable())应该这样做,但它目前在 Rx.Net 中不存在,那么实现相同行为的另一种方法是什么?

我可能有多个可观察量源,并且每个源都可以并行执行ProcessItemAsync,它应该只是在流顺序内,以保留输入/输出顺序,所以我不能将其锁定在ProcessItemAsync

更新:

鉴于类似的问题 反应式扩展 选择许多 和 Concat 我举了一个例子:https://dotnetfiddle.net/cG3T2v - 使用 SemaphoreSlim 是我管理它工作的唯一方法。

(这里是示例中的代码(

using System;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApp1
{
    class Program
    {
        static readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);
        static async Task Main(string[] args)
        {
            Console.WriteLine("Kinda in order ...");
            var kindaInOrder = Observable.Range(1, 4)
                .Select(x => LongProcessAsync(x).ToObservable())
                .Concat()
                .Do(x => Console.WriteLine($"{x} concatenated"));
            await kindaInOrder.RunAsync(CancellationToken.None);
            Console.WriteLine();
            Console.WriteLine("Completely in order ...");
            var completelyInOrder = Observable.Range(1, 4)
                .Select(x => OrderedLongProcessAsync(x).ToObservable())
                .Concat()
                .Do(x => Console.WriteLine($"{x} concatenated"));
            await completelyInOrder.RunAsync(CancellationToken.None);
        }
        static async Task<int> LongProcessAsync(int n)
        {
            Console.WriteLine($"Job {n} started");
            await Task.Delay(TimeSpan.FromSeconds(4 - n));
            Console.WriteLine($"Job {n} done");
            return n;
        }
        static async Task<int> OrderedLongProcessAsync(int n)
        {
            await semaphore.WaitAsync();
            try
            {
                return await LongProcessAsync(n);
            }
            finally
            {
                semaphore.Release();
            }
        }
    }
}
您可以使用

.Select().Merge()的组合来获得所描述的行为(仅在前一个完成时订阅每个内部生成的可观察对象(,并将maxConcurrent参数设置为 1。

static IObservable<TU> ConcatMap<T, TU>(this IObservable<T> source, Func<T, IObservable<TU>> selector) {
    return source.Select(selector).Merge(maxConcurrent: 1);
}

为了确保长时间运行的操作仅在前一个操作完成后启动,选择器必须返回"冷"可观察量:

source.ConcatMap(t => Observable.FromAsync(() => LongProcessAsync(t)))

您可以通过组合.Select().Concat()来获取ConcatMap的行为。

将下面粘贴到 LINQPad 进行尝试

public void Main()
{
    // Generate 100 items and 'process' async
    Observable
        .Range(0, 100)
        .Select(x => ProcessItemAsync(x))
        .Concat()
        .Dump();
}
private readonly Random _rnd = new Random();
public async Task<int> ProcessItemAsync(int item)
{
    await Task.Delay(_rnd.Next(0, 1000));
    return item;
}

代码启动 100 个需要不同时间才能完成的"作业",但您将看到结果的可观察量,以作业启动的顺序返回结果,而不是它们完成的顺序,如果您使用 .SelectMany()

(编辑:在原始帖子中,我制作了一个扩展方法,即选择/映射给定的异步 Func。然后我创建了一个类,其中等待、索引和存储结果,按顺序发出。但后来我在这里找到了一个更简单的解决方案:响应式扩展 SelectMany 和 Concat 的答案,这是 Select Concat 组合。

为了跟进我的评论,这里是建议的解决方案。

相反

.Select(x => LongProcessAsync(x).ToObservable())

使用以下内容:

.Select(x => Observable.Create<int>(async o =>
{
    try
    {
        var result = await LongProcessAsync(x);
        o.OnNext(result);
        o.OnCompleted();
    }
    catch (Exception ex)
    {
        o.OnError(ex);
    }
}))

相关内容

  • 没有找到相关文章

最新更新