可观察的行为.与多个IEnumerables一起使用时订阅



我试图从两个数组(IEnumerable s)创建一个IObservable<T>。我试图避免显式迭代数组并调用observer.OnNext。我遇到了Observable。订阅扩展方法,乍一看,这似乎是我需要的。然而,它没有像我期望的那样工作,我不知道为什么。

下面的代码是一个例子:

  class Program
  {
    static void Main(string[] args)
    {
      var observable = Observable.Create<char>(observer =>
        {
          var firstBytes = new[] {'A'};
          var secondBytes = new[] {'Z', 'Y'};
          firstBytes.Subscribe(observer);
          secondBytes.Subscribe(observer);
          return Disposable.Empty;
        }
      );
      observable.Subscribe(b => Console.Write(b));
    }
  }

它的输出是"AZ",而不是我期望的"AZY"。现在,如果我在firstBytes之前订阅secondBytes,输出是"ZAY"!这似乎表明它在同步枚举两个数组——这就解释了"AZ"输出。

无论如何,我完全不知道为什么它会这样做,如果有人能提供任何见解,我将不胜感激。

锁步迭代行为的原因可以通过Observable的实现来解释。订阅(IEnumerable源),它使用"递归"算法,该算法通过在调度程序操作中调用e.MoveNext来工作。如果成功,则发出该值,然后将a 新调度器动作排队以从可枚举对象中读取下一个值。

由于您订阅了两个枚举,并且没有为订阅指定任何特定的调度器,因此这些操作(由SchedulerDefaults.Iteration定义)将使用默认的迭代调度器,默认在当前线程上运行。这意味着枚举操作将在当前订阅操作完成后排队运行。这会导致枚举动作交错出现——类似于

  1. firstBytes.Subscribe() ->队列枚举动作
  2. secondBytes.Subscribe() ->队列枚举动作
  3. call firstBytes.MoveNext() -> OnNext("A") -> queue next enumeration action
  4. call secondBytes.MoveNext() -> OnNext("Z") -> queue next enumeration action
  5. 调用firstBytes.MoveNext() -> OnCompleted()
  6. call secondBytes.MoveNext() -> OnNext(Y) -> queue next enumeration action
  7. call secondBytes.MoveNext() -> OnCompleted()

观察者在第5步收到OnCompleted()通知,因此剩余的secondBytes枚举步骤被忽略。如果您已经返回了您的一次性订阅,那么第二次枚举将在此时取消。

因为您订阅了两个可观察对象,而不是将两个可观察对象连接在一起的单个可观察对象,所以有两个可能的源可以调用观察者的OnComplete方法。由于第一个数组较短,因此它在发出第一个项目后完成,并且观察者在收到完成通知后取消订阅。

正确的方法是将两个序列合并为一个序列,然后订阅该序列:

var observable = Observable.Create<char>(observer =>
{
    var firstBytes = new[] { 'A' };
    var secondBytes = new[] { 'Z', 'Y' };
    return firstBytes.Concat(secondBytes).Subscribe(observer);
});
observable.Subscribe(Console.Write);

相关内容

  • 没有找到相关文章