RX误解行为



我有下面的repro代码,它演示了一个更复杂的流程中的问题:

static void Main(string[] args)
    {
        var r = Observable.Range(1, 10).Finally(() => Console.WriteLine("Disposed"));
        var x = Observable.Create<int>(o =>
            {
                for (int i = 1; i < 11; i++)
                {
                    o.OnNext(i);
                }
                o.OnCompleted();
                return Disposable.Create(() => Console.WriteLine("Disposed"));
            });
        var src = x.Publish().RefCount();
        var a = src.Where(i => i % 2 == 0).Do(i => Console.WriteLine("Pair:" + i));
        var b = src.Where(i => i % 2 != 0).Do(i => Console.WriteLine("Even:" + i));
        var c = Observable.Merge(a, b);
        using (c.Subscribe(i => Console.WriteLine("final " + i), () => Console.WriteLine("Complete")))
        {
            Console.ReadKey();
        }
    }

用r作为src (var src = r.Publish().RefCount())运行这段代码将产生从1到10的所有数字,将src切换为x(如示例中)将只生成对,实际上是第一个订阅的可观察对象,除非我将Publish()更改为Replay()。

为什么?r和x的差是什么?

谢谢。

虽然我没有耐心对Rx进行排序。. NET源代码,以找出导致这种确切行为的具体实现细节,我可以提供以下见解:

你看到的行为差异是由竞态条件引起的。在这种情况下,竞手是ab的订阅,这是由于您订阅了Observable.Merge返回的可观察对象而发生的。你订阅了c,它又订阅了abab是根据xrPublishRefCount来定义的,这取决于您选择哪种情况。

是这样的

src = r

在这个例子中,你使用的是一个自定义的Observable。当订阅时,您的自定义可观察立即同步开始onNext数字1到10,然后调用onCompleted。有趣的是,这个订阅是由你的Publish().RefCount() Observable在第一次时订阅引起的。第一次被a订阅,因为aMerge的第一个参数。因此,在Merge订阅b之前,您的订阅就已经完成了。Merge订阅b,这是RefCount的可观察对象。这个观察对象已经完成了,所以Merge寻找下一个要合并的观察对象。由于没有更多的可观察对象需要合并,并且所有现有的可观察对象都已完成,因此合并的可观察对象完成。

在你的自定义可观察对象中onNext的值通过了"pairs"可观察对象,而不是"even "可观察对象。因此,您最终得到以下内容:

// "pairs" (has this been named incorrectly?)
[2, 4, 6, 8, 10]

src = x

在这个例子中,你使用内置的Range方法来创建一个Observable。当订阅时,这个Range Observable 做一些事情,最终产生1到10。有趣。我们不知道那个方法中发生了什么,也不知道何时发生。然而,我们可以对此进行一些观察。如果我们看看src = r(上面)时发生了什么,我们可以看到只有第一个订阅生效,因为可观察对象立即并同步地生成。因此,我们可以确定Range Observable不能以同样的方式产生,而是允许应用程序的控制流在产生任何值之前执行对b 的订阅。你的自定义Observable和这个Range Observable之间的区别可能在于Range Observable是调度CurrentThread Scheduler上发生的yield。

如何避免这种竞争条件:
var src = a.Publish(); // not ref count
var a = src.where(...);
var b = src.where(...);
var c = Observable.Merge(a, b);
var subscription = c.Subscribe(i => Console.WriteLine("final " + i), () => Console.WriteLine("Complete"))
// don't dispose of the subscription. The observable creates an auto-disposing subscription which will call dispose once `OnCompleted` or `OnError` is called.
src.Connect(); // connect to the underlying observable, *after* merge has subscribed to both a and b.

请注意,修复对这个可观察对象组合的订阅的解决方案不是改变源可观察对象的工作方式,而是确保您的订阅逻辑不允许存在任何竞争条件。这一点很重要,因为在Observable中修复这个问题仅仅是改变行为,而不是修复竞争。如果我们稍后更改源并将其切换出去,订阅逻辑仍然会有bug。

我怀疑是调度程序。此更改导致两者的行为相同:

var x = Observable.Create<int>(o =>
    {
        NewThreadScheduler.Default.Schedule(() =>
        {
            for (int i = 1; i < 11; i++)
            {
                o.OnNext(i);
            }
            o.OnCompleted();
        });
        return Disposable.Create(() => Console.WriteLine("Disposed"));
    });

而使用Scheduler.Immediate给出与您相同的行为。

相关内容

  • 没有找到相关文章

最新更新