我试图从两个数组(IEnumerable
s)创建一个IObservable<T>
。我试图避免显式迭代数组并调用observer.OnNext
。我遇到了Observable。订阅扩展方法,乍一看,这似乎是我需要的。然而,它没有像我期望的那样工作,我不知道为什么。
下面的代码是一个例子:
class Program
{
static void Main(string[] args)
{
var observable = Observable.Create<char>(observer =>
{
var firstBytes = new[] {'A'};
var secondBytes = new[] {'Z', 'Y'};
firstBytes.Subscribe(observer);
secondBytes.Subscribe(observer);
return Disposable.Empty;
}
);
observable.Subscribe(b => Console.Write(b));
}
}
它的输出是"AZ",而不是我期望的"AZY"。现在,如果我在firstBytes
之前订阅secondBytes
,输出是"ZAY"!这似乎表明它在同步枚举两个数组——这就解释了"AZ"输出。
无论如何,我完全不知道为什么它会这样做,如果有人能提供任何见解,我将不胜感激。
锁步迭代行为的原因可以通过Observable的实现来解释。订阅(IEnumerable源),它使用"递归"算法,该算法通过在调度程序操作中调用e.MoveNext
来工作。如果成功,则发出该值,然后将a 新调度器动作排队以从可枚举对象中读取下一个值。
由于您订阅了两个枚举,并且没有为订阅指定任何特定的调度器,因此这些操作(由SchedulerDefaults.Iteration
定义)将使用默认的迭代调度器,默认在当前线程上运行。这意味着枚举操作将在当前订阅操作完成后排队运行。这会导致枚举动作交错出现——类似于
- firstBytes.Subscribe() ->队列枚举动作
- secondBytes.Subscribe() ->队列枚举动作
- call firstBytes.MoveNext() -> OnNext("A") -> queue next enumeration action
- call secondBytes.MoveNext() -> OnNext("Z") -> queue next enumeration action
- 调用firstBytes.MoveNext() -> OnCompleted()
- call secondBytes.MoveNext() -> OnNext(Y) -> queue next enumeration action
- call secondBytes.MoveNext() -> OnCompleted()
观察者在第5步收到OnCompleted()通知,因此剩余的secondBytes枚举步骤被忽略。如果您已经返回了您的一次性订阅,那么第二次枚举将在此时取消。
因为您订阅了两个可观察对象,而不是将两个可观察对象连接在一起的单个可观察对象,所以有两个可能的源可以调用观察者的OnComplete
方法。由于第一个数组较短,因此它在发出第一个项目后完成,并且观察者在收到完成通知后取消订阅。
正确的方法是将两个序列合并为一个序列,然后订阅该序列:
var observable = Observable.Create<char>(observer =>
{
var firstBytes = new[] { 'A' };
var secondBytes = new[] { 'Z', 'Y' };
return firstBytes.Concat(secondBytes).Subscribe(observer);
});
observable.Subscribe(Console.Write);