我正在尝试根据下游条件过滤掉更多的上游项目。 mapProcess本质上启动了一个进程(脚本或exe(。该过程可能需要一些时间才能完成,我想忽略任何进一步的上游项目,直到完成。createProcess 还返回 StdOut 的 Observable。我们切换到由createProcess创建的IObservable,并将arg映射到StdOut。
例:
let mapProcess obs =
obs
|> Observable.map (fun arg -> createProcess arg)
|> Observable.switch
我尝试过什么:这有效,但对这里的可变对象并不完全满意。
let mapProcess obs =
let mutable processNotRunning = true
obs
|> Observable.filter (fun _ -> processNotRunning)
|> Observable.map (fun arg -> processNotRunning <- false
createProcess arg)
|> Observable.switch
|> Observable.iter (fun _ -> processNotRunning <- true)
|> Observable.finallyDo (fun _ -> processNotRunning <- true)
我认为我可能需要的是某种"switchIfSeen"可观察函数,该函数仅在当前订阅的可观察量生成项目或已完成时才会切换。我是否可能通过组合一些现有的 RX 功能而错过了更简单的方法?
[...] 仅在当前订阅的可观察量生成 项目或已完成
忽略值的一种方法是将热可观察量转换为冷可观察量 - 因此无论谁在听,它都可以推出项目。然后,你只在需要时倾听。
var map = argn.Select(CreateProcess).Publish().RefCount();
map.SelectMany(o => o) //flatmap
.Take(1)
.Repeat()
.Subscribe(d => Console.WriteLine($"Did task which took {d * 100}msecs" ));
测试:
(CreateProcess
只是一个计时器,它创建 x100 毫秒的固定延迟。
private static void Main(string[] args)
{
var argn = Observable.Interval(TimeSpan.FromMilliseconds(100)).Publish().RefCount();
argn.Subscribe(Console.WriteLine);
var map = argn.Select(CreateProcess).Publish().RefCount();
map.SelectMany(o => o)
.Take(1)
.Repeat()
.Subscribe(d => Console.WriteLine($"Did task which took {d * 100}msecs" ));
Console.ReadKey();
}
static IObservable<long> CreateProcess(long i) => Observable.Timer(TimeSpan.FromMilliseconds(i * 100)).Select(_ => i);
输出:
0
Did task which took 0msecs
1
2
Did task which took 100msecs
3
4
5
Did task which took 300msecs
6
7
8
9
10
11
Did task which took 600msecs