如何将可观察的状态配对



我正在学习rx.net,并且我正在建立一个在许多间隔内运行的调度程序。这个想法是我有一系列间隔和一系列命令,然后将它们核对并订阅合并结果。每个命令应与其各自的间隔关联,并应按照其关联定义的间隔(Cadence)执行。

虽然我能够将间隔合并在一起,但我有一段时间试图弄清楚如何将此命令(状态)传递给订阅。 我确实看到了Scan功能,但这看起来是汇总的,而不是夫妇/元组。这是我的代码:

var intervals = new[]
                {
                    TimeSpan.FromSeconds(30),
                    TimeSpan.FromSeconds(60),
                    TimeSpan.FromSeconds(90)
                };
var commands = new Action[]
                {
                    () => Console.WriteLine("30 Seconds!"),
                    () => Console.WriteLine("60 Seconds!"),
                    () => Console.WriteLine("90 Seconds!")
                };
intervals.Zip(commands, ValueTuple.Create)
         .Select(x => Observable.Interval(x.Item1)) // <-- Need magic here. :)
         .Merge()
         .Subscribe(x =>
                    {
                        x.Item2(); // Broken, x is a long.
                    });

RX中有没有办法做我想在这里实现的目标?类似于Scan,但不是累加器,而是脱衣舞机(如果是单词)。

如果您希望在30、60、90秒的间隔重复操作:

intervals.Zip(commands, ValueTuple.Create)
         .Select(x => Observable.Interval(x.Item1).Select(_ => x))
         .Merge()
         .Subscribe(x =>
                    {
                        x.Item2(); // Broken, x is a long.
                    });

如果您只想触发一次操作:

intervals.Zip(commands, ValueTuple.Create)
         .Select(x => Observable.Delay(Observable.Return(x), x.Item1)) 
         .Merge()
         .Subscribe(x =>
                    {
                        x.Item2(); // Broken, x is a long.
                });

我要使用一个稍有不同的版本来查询所需的查询。

现在,您的代码的查询部分(直至和包含.Merge()仅选择命令 - 您正在依靠订户执行命令。。这在某些情况下可能是正确的逻辑,但不是全部。

我的想法是,当您订阅可观察到的可观察到的时,您希望确保运行命令,无论订户的工作如何。订户只应对可观察的值反应,如果值应该应该产生值。

这是为此的代码:

IObservable<Unit> query =
    intervals
        .Zip(commands, (Interval, Command) => new { Interval, Command })
        .Select(x =>
            Observable
                .Interval(x.Interval)
                .SelectMany(y =>
                    Observable
                        .Start(() => x.Command())))
        .Merge();

每当观察者订阅时,都会确保命令运行。观察者可以报告这样做。

最基本的订阅现在只是IDisposable subscription = query.Subscribe();。简单。

相关内容

  • 没有找到相关文章

最新更新