我将如何用反应性编程编写此代码



我刚刚开始用反应性编程弄乱,我知道足以编写代码,但不足以弄清楚当我不收到我的期望时会发生什么。除博客文章外,我真的没有其他导师。我还没有找到一个很好的解决方案,可以解决我的情况,我对正确的方法感到好奇。

问题:

我需要获得一个由bar对象组成的foo。我从Web服务中获取栏对象。因此,我将每个Web服务呼叫代表为一个可让我希望在完成之前0或1个元素的iObservable。我想制作一个将要:

  • 订阅每个iObservable实例。
  • 等待2秒的超时。
  • 当两个序列完成或超时发生时:
    • 使用生成的任何条对象创建一个数组(可能有0。)
    • 使用该栏[]产生foo对象。

我用这一点代码来完成此操作:

public Foo CreateFoo() {
        var producer1 = webService.BarGenerator()
                                  .Timeout(TimeSpan.FromSeconds(2), Observable.Empty<Bar>());
        var producer2 = // similar to above
        var pipe = producer1.Concat(producer2);
        Bar[] result = pipe.ToEnumerable().ToArray();
        ...
}

这似乎是不正确的,原因有很多。最明显的是Concat()将在序列而不是并行启动序列,因此这是一个4秒的超时。我真的不在乎它会阻止它,这实际上很方便我正在使用的架构。我对这种方法成为iObservable的生成器我很好,但是当我尝试时,这里有一些额外的警告似乎使人具有挑战性:

  1. 我需要最后一个数组来将Producer1和Producer2的结果放在该顺序中,如果它们都产生结果。
  2. 我想使用testscheduler验证超时,但还没有成功,我显然根本不了解调度程序。
  3. 最终,这是一个拉动模型,无论将其带到一个明显的位置,并且没有价值"飞行"没有价值。也许这倾斜了"不要使用RX"的答案。老实说,我被卡住了,我切换到了基于任务的API。但是我想看看一个人如何与Rx一起对此进行操作,因为我想学习。
    var pipe = producer1
        .Merge(producer2)
        .Buffer(Observable.Timer(TimeSpan.FromSeconds(2), testScheduler))
        .Take(1);
    var subscription = pipe
        .Select(list => new Foo(list.ToArray())
        .Subscribe(foo => {} /* Do whatever you want with your foo here.*/);

Buffer采用窗口中发出的所有元素(在我们的情况下为两秒钟),并输出一个列表。

如果要坚持使用拉模型,而不是订阅,则可以这样做:

var list = await pipe;
var foo = new Foo(list.ToArray());
//....

相关内容

  • 没有找到相关文章

最新更新