我有下面的repro代码,它演示了一个更复杂的流程中的问题:
static void Main(string[] args)
{
var r = Observable.Range(1, 10).Finally(() => Console.WriteLine("Disposed"));
var x = Observable.Create<int>(o =>
{
for (int i = 1; i < 11; i++)
{
o.OnNext(i);
}
o.OnCompleted();
return Disposable.Create(() => Console.WriteLine("Disposed"));
});
var src = x.Publish().RefCount();
var a = src.Where(i => i % 2 == 0).Do(i => Console.WriteLine("Pair:" + i));
var b = src.Where(i => i % 2 != 0).Do(i => Console.WriteLine("Even:" + i));
var c = Observable.Merge(a, b);
using (c.Subscribe(i => Console.WriteLine("final " + i), () => Console.WriteLine("Complete")))
{
Console.ReadKey();
}
}
用r作为src (var src = r.Publish().RefCount()
)运行这段代码将产生从1到10的所有数字,将src切换为x(如示例中)将只生成对,实际上是第一个订阅的可观察对象,除非我将Publish()更改为Replay()。
为什么?r和x的差是什么?
谢谢。
虽然我没有耐心对Rx进行排序。. NET源代码,以找出导致这种确切行为的具体实现细节,我可以提供以下见解:
你看到的行为差异是由竞态条件引起的。在这种情况下,竞手是a
和b
的订阅,这是由于您订阅了Observable.Merge
返回的可观察对象而发生的。你订阅了c
,它又订阅了a
和b
。a
和b
是根据x
或r
的Publish
和RefCount
来定义的,这取决于您选择哪种情况。
是这样的
src = r
在这个例子中,你使用的是一个自定义的Observable。当订阅时,您的自定义可观察立即和同步开始onNext
数字1到10,然后调用onCompleted
。有趣的是,这个订阅是由你的Publish().RefCount()
Observable在第一次时订阅引起的。第一次被a
订阅,因为a
是Merge
的第一个参数。因此,在Merge
订阅b
之前,您的订阅就已经完成了。Merge
订阅b
,这是RefCount
的可观察对象。这个观察对象已经完成了,所以Merge
寻找下一个要合并的观察对象。由于没有更多的可观察对象需要合并,并且所有现有的可观察对象都已完成,因此合并的可观察对象完成。
在你的自定义可观察对象中onNext的值通过了"pairs"可观察对象,而不是"even "可观察对象。因此,您最终得到以下内容:
// "pairs" (has this been named incorrectly?)
[2, 4, 6, 8, 10]
src = x
在这个例子中,你使用内置的Range
方法来创建一个Observable。当订阅时,这个Range Observable 做一些事情,最终产生1到10。有趣。我们不知道那个方法中发生了什么,也不知道何时发生。然而,我们可以对此进行一些观察。如果我们看看src = r
(上面)时发生了什么,我们可以看到只有第一个订阅生效,因为可观察对象立即并同步地生成。因此,我们可以确定Range Observable不能以同样的方式产生,而是允许应用程序的控制流在产生任何值之前执行对b
的订阅。你的自定义Observable和这个Range Observable之间的区别可能在于Range Observable是调度在CurrentThread
Scheduler上发生的yield。
var src = a.Publish(); // not ref count
var a = src.where(...);
var b = src.where(...);
var c = Observable.Merge(a, b);
var subscription = c.Subscribe(i => Console.WriteLine("final " + i), () => Console.WriteLine("Complete"))
// don't dispose of the subscription. The observable creates an auto-disposing subscription which will call dispose once `OnCompleted` or `OnError` is called.
src.Connect(); // connect to the underlying observable, *after* merge has subscribed to both a and b.
请注意,修复对这个可观察对象组合的订阅的解决方案不是改变源可观察对象的工作方式,而是确保您的订阅逻辑不允许存在任何竞争条件。这一点很重要,因为在Observable中修复这个问题仅仅是改变行为,而不是修复竞争。如果我们稍后更改源并将其切换出去,订阅逻辑仍然会有bug。
我怀疑是调度程序。此更改导致两者的行为相同:
var x = Observable.Create<int>(o =>
{
NewThreadScheduler.Default.Schedule(() =>
{
for (int i = 1; i < 11; i++)
{
o.OnNext(i);
}
o.OnCompleted();
});
return Disposable.Create(() => Console.WriteLine("Disposed"));
});
而使用Scheduler.Immediate
给出与您相同的行为。