RX for.NET-调用observer时.OnCompleted来自Disposable.创建时不会发生任何事情



我正在玩C#中.NET的RX库。有人能向我解释一下为什么"observer.OnCompleted()"方法在以下代码中什么都不做吗:

var observableStream = Observable.Create<CustomMessage>(
(observer) =>
{
CustomMessage cm = new CustomMessage();
CustomMessage.Subscribe(observer.OnNext);
return Disposable.Create(
() =>
{
Console.WriteLine("Disposing...");
CustomMessage.Unsubscribe(observer.OnNext);                          
observer.OnCompleted();                //***Nothing happens here***
}
);
});
//IObserver.OnException()
public override void OnException(Exception e)
{
Console.WriteLine("Exception occurred - " + e.Message);
}
//IObserver.OnComplete()
public override void OnUnsubscribe()
{
Console.WriteLine("Unsubscribed...");            
}
//IObserver.OnNext()
public override void HandleNextMsg(IRVMessage msg)
{
Console.WriteLine("Instance received a message");
}
IDisposable myDisposable = observableStream.Subscribe(HandleNextMsg, OnException, OnUnsubscribe);
//At some later point....
myDisposable.Dispose();

该代码旨在订阅CustomMessages流。它记录观察者。设置订阅时使用我的CustomMessage类型的OnNext()方法。然后,它取消对观测者的注册。处理订阅时使用OnNext()。所有这些都是正确的。每当收到CustomMessage时,都会调用我的"HandleNextMsg()"方法。

稍后,当我想终止订阅时,我会调用"Dispose()",并成功执行以下两行:

Console.WriteLine("Disposing...");
CustomMessage.Unsubscribe(observer.OnNext); 

然后我就不会再收到自定义消息了。然而,以下行虽然执行了,但什么也没做:

observer.OnCompleted(); 

我本以为它会打电话给

Console.WriteLine("Unsubscribed...");

在某个时刻,observer和"OnUnsubscribe"方法之间的连接丢失了,我想了解到底发生了什么。为什么"observer.OnNext()"可以成功注销,但"observer.OnCompleted()"什么都不做?

有人向我指出,仅仅因为我正在处理流并不意味着我应该调用"OnCompleted()",但我仍然想理解为什么它不起作用。

OnCompleted()用于通知订阅者上游序列(在您的情况下为CustomMessage)已经结束。这并不意味着确认订阅者请求的取消订阅已经成功,这似乎是你试图使用它的方式。OnCompleted()是关于序列的通知,适用于所有订阅者,而不是针对该序列的单个订阅。

换句话说,您不应该希望在Dispose中调用它。毕竟,订户自己在进行退订,为什么需要通知它?

至于什么都没发生的实际技术原因,我猜回调(按设计)在你已经处理好的时候不会持续下去。只是一个理论,它不是很相关。

您看到的问题是由Observable.Create封装您传递的函数以及订阅生成的IObservable的任何观察程序的方式引起的。基本上,流程是:

  • Observable.Create返回AnonymousObservable
  • AnonymousObservable将观察者封装在Subscribe中的AutoDetachObserver中
  • AnonymousObservable从Subscribe返回AutoDetachObserver(实现IDisposable)
  • AutoDetachObserver。Dispose设置其停止标志,然后处理原始订阅函数返回的对象。此标志导致观察器忽略将来对OnError和OnCompleted的调用,从而导致不调用封装的观察器方法

这个答案是基于RX的v1.x,但我预计这在v2.0中没有改变。

如果您有一些代码需要运行,无论订阅如何结束(OnError、OnCompleted或Dispose),我建议您使用Observable.Finally。

最新更新