合并两个 IObservable 时出现意外行为



我在最近的"Play With Rx"项目中遇到以下情况:

class Program
{
    static void Main(string[] args)
    {
        var observable1 = Observable.Create<int>(
               (Func<IObserver<int>, IDisposable>)GenerateSequence);
        var observable2 = Observable.Create<int>(
               (Func<IObserver<int>, IDisposable>)GenerateSequence);
        var merged = observable1.Merge(observable2);
        observable1.Subscribe(i => Console.WriteLine("1: " + i.ToString()));
        observable2.Subscribe(i => Console.WriteLine("2: " + i.ToString()));
        merged.Subscribe(i => Console.WriteLine("Merged: " + i.ToString()));
        Console.ReadLine();
    }
    private static int count = 0;
    private static IDisposable GenerateSequence(IObserver<int> observer)
    {
        ThreadPool.QueueUserWorkItem((o) =>
        {
            while (true)
            {
                observer.OnNext(count++);
                Thread.Sleep(500);
            }
        });
        return Disposable.Empty;
    }
}

现在,我希望看到类似的东西

1: 0
2: 1
Merged: 0
Merged: 1
1: 2
2: 3
Merged: 2
Merged: 3

相反,我看到的是

1: 0
2: 1
Merged: 2
Merged: 3
1: 4
2: 5
Merged: 6
Merged: 7

如果我将循环替换为

while (true)
{
    observer.OnNext(r.Next(0, 1000));
    Thread.Sleep(500);
}

对于 Random 的静态或局部实例 r,合并的序列中还有其他数字,即两个单独的序列!

我不明白如何从一次observer.OnNext(...)调用中多次执行count++r.Next(0, 1000).合并呢?

PS:我试图通过锁定或分离两个线程的循环时间来消除竞争条件,但结果没有变化,所以我将这些尝试排除在外。

编辑:似乎调用了 4 次GenerateSequence,因此旋转 4 个线程以递增count。虽然这解释了我所看到的,但我不明白为什么会这样。

  • 当你订阅observable1时,你订阅了Observable.Create(GenerateSequence),它会调用GenerateSequence并开始循环。
  • 当你订阅observable2时,你订阅了Observable.Create(GenerateSequence),它调用GenerateSequence并开始循环。
  • 当您订阅merged时,您订阅了Observable.Merge(observable1, observable2),订阅了observable1observable2。我们在前两点中看到了当你做每一个时会发生什么。

最终结果是对 GenerateSequence 的四次调用。

要获得非常接近您正在寻找的效果,您需要查看Publish()

var observable1 = Observable
    .Create<int>((Func<IObserver<int>, IDisposable>)GenerateSequence)
    .Publish();
var observable2 = Observable
    .Create<int>((Func<IObserver<int>, IDisposable>)GenerateSequence)
    .Publish();
var merged = observable1.Merge(observable2);
observable1.Subscribe(i => Console.WriteLine("1: " + i.ToString()));
observable2.Subscribe(i => Console.WriteLine("2: " + i.ToString()));
merged.Subscribe(i => Console.WriteLine("Merged: " + i.ToString()));
observable1.Connect();
observable2.Connect();

observable1observable2现在属于IConnectableObservable型,这意味着它们会推迟订阅其底层IObservable(在您的情况下Observable.Create),直到它们被调用Connect

输出

1: 0
Merged: 0
2: 1
Merged: 1
1: 2
Merged: 2
2: 3
Merged: 3
...

相关内容

  • 没有找到相关文章

最新更新