如果CancellationDisposable与MultiCast一起使用,则不会激发OnCompleted



我使用使用CancellationDisposableObservable.Create实现了一个可观察对象。由于我想在创建后共享可观察到的内容,我使用了Publish和更高版本的Connect。如果Connect返回的一次性物品被丢弃,则CancellationDisposable也被丢弃并且抛出OperationCancelledException。问题是观察者o1从未被告知异常。为什么会发生这种情况?我如何熟练地将CancellationDisposablePublish组合使用?

var obs = Observable.Create<int>(
            observer =>
                {
                    var cancel = new CancellationDisposable();
                    var scheduler = Scheduler.Default.Schedule(() =>
                        {
                            try
                            {
                                observer.OnNext(1);
                                Thread.Sleep(2000);
                                cancel.Token.ThrowIfCancellationRequested();
                                observer.OnNext(2);
                                observer.OnCompleted();
                            }
                            catch (Exception ex)
                            {
                                observer.OnError(ex);
                            }
                        });
                    return new CompositeDisposable(cancel, scheduler);
                })
            .ObserveOn(ThreadPoolScheduler.Instance)
            .Publish();
        var o1 = obs.Subscribe(x => Console.WriteLine("Next: {0}", x), x => Console.WriteLine("Error: {0}", x), () => Console.WriteLine("Completed"));
        var connection = obs.Connect();
        Console.WriteLine("Press key to cancel");
        Console.ReadLine();
        connection.Dispose();
        Console.ReadLine();

立即按Enter键输出:

Press key to cancel
Next: 1

处理连接会处理所有订阅(内部)。这是Rx的抵消模型。当一个订阅被释放时,观察者不再接收任何类型的通知。

参见处方设计指南文件中的§4.4。

如果您希望在观察员被取消时收到通知,请使用Finally运算符。请注意,它通常适用于终止,因此您的操作也会在完成或失败时调用。

var o1 = obs.Finally(() => Console.WriteLine("Terminated")).Subscribe(...);

更新:

仔细阅读我的答案,我意识到我并不完全清楚。Finally运算符仅在您自己处理订阅时调用。处理连接只会处理对已发布的可观察对象的内部订阅,使您的订阅处于"活动"状态。原因是您可以随时重新连接已发布的可观察对象,并且您的订阅将继续收到通知。因此,o1实际上根本没有被取消。

此外,无论如何都不能调用OnCompleted,因为这样,通过简单地重新连接可观察对象,就可以在同一订阅上多次调用它,这当然违反了Rx语法。参见处方设计指南中的§4.1

更新2:

正如评论中所提到的,虽然OnCompleted在取消时不被调用,但它是在可观察到的成功终止时调用的;然而,观察者将来将不再接收任何通知(以满足Rx语法),即使在随后的重新连接时也是如此。此外,由于序列已终止,所有Finally运算符都将已执行。

更新3:回答

在取消可连接的可观测值时,当然有可能让OnCompleted调用每个观测者,尽管我不建议这样做,因为这非常奇怪,因为它违背了Rx中的原则,但遗憾的是,我必须提供它来从技术上回答最初的问题。该解决方案的关键是使用Subject<T>以及在呼叫Publish之前插入的Finally运算符。

请注意,我对代码进行了轻微的更改,以使用异步迭代器并删除错误处理(请参阅下面的注释)。

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
namespace RxLabs.Net45
{
  class PublishFinallyLab
  {
    public static void Main()
    {
      var obsOrCancellation = new Subject<int>();
      var obs = Observable.Create<int>(
            async (observer, cancel) =>
            {
              observer.OnNext(1);
              await Task.Delay(TimeSpan.FromSeconds(2), cancel).ConfigureAwait(false);
              if (!cancel.IsCancellationRequested)
              {
                observer.OnNext(2);
              }
            })
            .Finally(obsOrCancellation.OnCompleted)
            .Publish();
      obs.Subscribe(obsOrCancellation);
      var o1 = obsOrCancellation
            .Finally(() => Console.WriteLine("Finally!"))
            .Subscribe(
              x => Console.WriteLine("Next: {0}", x),
              ex => Console.WriteLine("Error: {0}", ex),
              () => Console.WriteLine("Completed"));
      do
      {
        using (var connection = obs.Connect())
        {
          Console.WriteLine("Press any key to cancel.");
          Console.ReadKey();
        }
        Console.WriteLine("Press any key to continue.");
        Console.ReadKey();
      }
      while (true);
    }
  }
}

--

在一个无关但重要的方面,捕捉观察者抛出的异常是一种反模式。不要这样做。你应该完全删除try..catch。在可观察到的对非观察者代码的调用中没有可能抛出的代码,因此根本不应该调用OnError

相关内容

  • 没有找到相关文章

最新更新