我最近一直在尝试了解。net的响应式扩展,但遇到了一点概念上的障碍:我不明白为什么IObservable.First()会阻塞。
我有一些示例代码,看起来有点像这样:
var a = new ListItem(a);
var b = new ListItem(b);
var c = new ListItem(c);
var d = new ListItem(d);
var observableList = new List<ListItem> { a,b,c,d }.ToObservable();
var itemA = observableList.First();
// Never reached
Assert.AreEqual(expectedFoo, itemA.Foo);
我期望发生的是itemA
在引用上等于a
,并且能够访问其成员,等等。相反发生的是First()
阻塞而Assert.AreEqual()
永远不会到达。
现在,我知道当使用Rx时,代码应该Subscribe()
到IObservable
s,所以很可能我在这里做了错误的事情。但是,根据不同的方法签名,不可能执行以下任何一项:
observableList.First().Subscribe(item => Assert.AreEqual(expectedFoo, item));
或
observableList.Subscribe(SomeMethod).First() // This doesn't even make sense, right?
我错过了什么?
在测试项目中尝试此代码运行良好,因此我重新查看了有问题的代码。原来问题是由于IObservable<ListItem>
被Publish()
改造成IConnectableObservable<ListItem>
。如果没有呼叫连接,订阅就永远不会"激活"。
First()
返回T
,而不是Observable<T>
,因此它必须阻塞。
非阻塞表单是xs.Take(1)