我在最近的"Play With Rx"项目中遇到以下情况:
class Program
{
static void Main(string[] args)
{
var observable1 = Observable.Create<int>(
(Func<IObserver<int>, IDisposable>)GenerateSequence);
var observable2 = Observable.Create<int>(
(Func<IObserver<int>, IDisposable>)GenerateSequence);
var merged = observable1.Merge(observable2);
observable1.Subscribe(i => Console.WriteLine("1: " + i.ToString()));
observable2.Subscribe(i => Console.WriteLine("2: " + i.ToString()));
merged.Subscribe(i => Console.WriteLine("Merged: " + i.ToString()));
Console.ReadLine();
}
private static int count = 0;
private static IDisposable GenerateSequence(IObserver<int> observer)
{
ThreadPool.QueueUserWorkItem((o) =>
{
while (true)
{
observer.OnNext(count++);
Thread.Sleep(500);
}
});
return Disposable.Empty;
}
}
现在,我希望看到类似的东西
1: 0
2: 1
Merged: 0
Merged: 1
1: 2
2: 3
Merged: 2
Merged: 3
相反,我看到的是
1: 0
2: 1
Merged: 2
Merged: 3
1: 4
2: 5
Merged: 6
Merged: 7
如果我将循环替换为
while (true)
{
observer.OnNext(r.Next(0, 1000));
Thread.Sleep(500);
}
对于 Random 的静态或局部实例 r,合并的序列中还有其他数字,即两个单独的序列!
我不明白如何从一次observer.OnNext(...)
调用中多次执行count++
或r.Next(0, 1000)
.合并呢?
PS:我试图通过锁定或分离两个线程的循环时间来消除竞争条件,但结果没有变化,所以我将这些尝试排除在外。
编辑:似乎调用了 4 次GenerateSequence
,因此旋转 4 个线程以递增count
。虽然这解释了我所看到的,但我不明白为什么会这样。
- 当你订阅
observable1
时,你订阅了Observable.Create(GenerateSequence)
,它会调用GenerateSequence
并开始循环。 - 当你订阅
observable2
时,你订阅了Observable.Create(GenerateSequence)
,它调用GenerateSequence
并开始循环。 - 当您订阅
merged
时,您订阅了Observable.Merge(observable1, observable2)
,订阅了observable1
和observable2
。我们在前两点中看到了当你做每一个时会发生什么。
最终结果是对 GenerateSequence
的四次调用。
要获得非常接近您正在寻找的效果,您需要查看Publish()
:
var observable1 = Observable
.Create<int>((Func<IObserver<int>, IDisposable>)GenerateSequence)
.Publish();
var observable2 = Observable
.Create<int>((Func<IObserver<int>, IDisposable>)GenerateSequence)
.Publish();
var merged = observable1.Merge(observable2);
observable1.Subscribe(i => Console.WriteLine("1: " + i.ToString()));
observable2.Subscribe(i => Console.WriteLine("2: " + i.ToString()));
merged.Subscribe(i => Console.WriteLine("Merged: " + i.ToString()));
observable1.Connect();
observable2.Connect();
observable1
和observable2
现在属于IConnectableObservable
型,这意味着它们会推迟订阅其底层IObservable
(在您的情况下Observable.Create
),直到它们被调用Connect
。
输出
1: 0
Merged: 0
2: 1
Merged: 1
1: 2
Merged: 2
2: 3
Merged: 3
...