基于下游条件的RX过滤器



我正在尝试根据下游条件过滤掉更多的上游项目。 mapProcess本质上启动了一个进程(脚本或exe(。该过程可能需要一些时间才能完成,我想忽略任何进一步的上游项目,直到完成。createProcess 还返回 StdOut 的 Observable。我们切换到由createProcess创建的IObservable,并将arg映射到StdOut。

例:

let mapProcess obs =
  obs
  |> Observable.map (fun arg -> createProcess arg)
  |> Observable.switch

我尝试过什么:这有效,但对这里的可变对象并不完全满意。

let mapProcess obs =
  let mutable processNotRunning = true
  obs
  |> Observable.filter (fun _ -> processNotRunning)
  |> Observable.map (fun arg -> processNotRunning <- false
                                createProcess arg)  
  |> Observable.switch
  |> Observable.iter (fun _ -> processNotRunning <- true)
  |> Observable.finallyDo (fun _ -> processNotRunning <- true)

我认为我可能需要的是某种"switchIfSeen"可观察函数,该函数仅在当前订阅的可观察量生成项目或已完成时才会切换。我是否可能通过组合一些现有的 RX 功能而错过了更简单的方法?

[...] 仅在当前订阅的可观察量生成 项目或已完成

忽略值的一种方法是将热可观察量转换为冷可观察量 - 因此无论谁在听,它都可以推出项目。然后,你只在需要时倾听。

  var map = argn.Select(CreateProcess).Publish().RefCount();
        map.SelectMany(o => o) //flatmap
           .Take(1)
           .Repeat()
           .Subscribe(d => Console.WriteLine($"Did task which took {d * 100}msecs" ));

测试:

(CreateProcess只是一个计时器,它创建 x100 毫秒的固定延迟。

    private static void Main(string[] args)
    {
        var argn = Observable.Interval(TimeSpan.FromMilliseconds(100)).Publish().RefCount();
        argn.Subscribe(Console.WriteLine);
        var map = argn.Select(CreateProcess).Publish().RefCount();
        map.SelectMany(o => o)
           .Take(1)
           .Repeat()
           .Subscribe(d => Console.WriteLine($"Did task which took {d * 100}msecs" ));
        Console.ReadKey();
    }
    static IObservable<long> CreateProcess(long i) => Observable.Timer(TimeSpan.FromMilliseconds(i * 100)).Select(_ => i);

输出:

0
Did task which took 0msecs
1
2
Did task which took 100msecs
3
4
5
Did task which took 300msecs
6
7
8
9
10
11
Did task which took 600msecs

相关内容

  • 没有找到相关文章

最新更新