我刚刚开始用反应性编程弄乱,我知道足以编写代码,但不足以弄清楚当我不收到我的期望时会发生什么。除博客文章外,我真的没有其他导师。我还没有找到一个很好的解决方案,可以解决我的情况,我对正确的方法感到好奇。
问题:
我需要获得一个由bar对象组成的foo。我从Web服务中获取栏对象。因此,我将每个Web服务呼叫代表为一个可让我希望在完成之前0或1个元素的iObservable。我想制作一个将要:
- 订阅每个iObservable实例。
- 等待2秒的超时。
- 当两个序列完成或超时发生时:
- 使用生成的任何条对象创建一个数组(可能有0。)
- 使用该栏[]产生foo对象。
我用这一点代码来完成此操作:
public Foo CreateFoo() {
var producer1 = webService.BarGenerator()
.Timeout(TimeSpan.FromSeconds(2), Observable.Empty<Bar>());
var producer2 = // similar to above
var pipe = producer1.Concat(producer2);
Bar[] result = pipe.ToEnumerable().ToArray();
...
}
这似乎是不正确的,原因有很多。最明显的是Concat()将在序列而不是并行启动序列,因此这是一个4秒的超时。我真的不在乎它会阻止它,这实际上很方便我正在使用的架构。我对这种方法成为iObservable的生成器我很好,但是当我尝试时,这里有一些额外的警告似乎使人具有挑战性:
- 我需要最后一个数组来将Producer1和Producer2的结果放在该顺序中,如果它们都产生结果。
- 我想使用testscheduler验证超时,但还没有成功,我显然根本不了解调度程序。
- 最终,这是一个拉动模型,无论将其带到一个明显的位置,并且没有价值"飞行"没有价值。也许这倾斜了"不要使用RX"的答案。老实说,我被卡住了,我切换到了基于任务的API。但是我想看看一个人如何与Rx一起对此进行操作,因为我想学习。
var pipe = producer1
.Merge(producer2)
.Buffer(Observable.Timer(TimeSpan.FromSeconds(2), testScheduler))
.Take(1);
var subscription = pipe
.Select(list => new Foo(list.ToArray())
.Subscribe(foo => {} /* Do whatever you want with your foo here.*/);
Buffer
采用窗口中发出的所有元素(在我们的情况下为两秒钟),并输出一个列表。
如果要坚持使用拉模型,而不是订阅,则可以这样做:
var list = await pipe;
var foo = new Foo(list.ToArray());
//....