我正在学习rx.net,并且我正在建立一个在许多间隔内运行的调度程序。这个想法是我有一系列间隔和一系列命令,然后将它们核对并订阅合并结果。每个命令应与其各自的间隔关联,并应按照其关联定义的间隔(Cadence)执行。
虽然我能够将间隔合并在一起,但我有一段时间试图弄清楚如何将此命令(状态)传递给订阅。 我确实看到了Scan
功能,但这看起来是汇总的,而不是夫妇/元组。这是我的代码:
var intervals = new[]
{
TimeSpan.FromSeconds(30),
TimeSpan.FromSeconds(60),
TimeSpan.FromSeconds(90)
};
var commands = new Action[]
{
() => Console.WriteLine("30 Seconds!"),
() => Console.WriteLine("60 Seconds!"),
() => Console.WriteLine("90 Seconds!")
};
intervals.Zip(commands, ValueTuple.Create)
.Select(x => Observable.Interval(x.Item1)) // <-- Need magic here. :)
.Merge()
.Subscribe(x =>
{
x.Item2(); // Broken, x is a long.
});
RX中有没有办法做我想在这里实现的目标?类似于Scan
,但不是累加器,而是脱衣舞机(如果是单词)。
如果您希望在30、60、90秒的间隔重复操作:
intervals.Zip(commands, ValueTuple.Create)
.Select(x => Observable.Interval(x.Item1).Select(_ => x))
.Merge()
.Subscribe(x =>
{
x.Item2(); // Broken, x is a long.
});
如果您只想触发一次操作:
intervals.Zip(commands, ValueTuple.Create)
.Select(x => Observable.Delay(Observable.Return(x), x.Item1))
.Merge()
.Subscribe(x =>
{
x.Item2(); // Broken, x is a long.
});
我要使用一个稍有不同的版本来查询所需的查询。
现在,您的代码的查询部分(直至和包含.Merge()
仅选择命令 - 您正在依靠订户执行命令。。这在某些情况下可能是正确的逻辑,但不是全部。
我的想法是,当您订阅可观察到的可观察到的时,您希望确保运行命令,无论订户的工作如何。订户只应对可观察的值反应,如果值应该应该产生值。
这是为此的代码:
IObservable<Unit> query =
intervals
.Zip(commands, (Interval, Command) => new { Interval, Command })
.Select(x =>
Observable
.Interval(x.Interval)
.SelectMany(y =>
Observable
.Start(() => x.Command())))
.Merge();
每当观察者订阅时,都会确保命令运行。观察者可以报告这样做。
最基本的订阅现在只是IDisposable subscription = query.Subscribe();
。简单。