我想检查一下我创建的IObservable
是否尊重"一旦我完成,我将取消订阅你">的礼貌。乍一看,我的代码似乎出了点问题。但是消除我的代码,并且仅使用TestScheduler提供的Observable和Observer,看起来"取消订阅"永远不会发生:
using Microsoft.Reactive.Testing;
using System.Reactive;
...
var ts = new TestScheduler();
var ob = ts.CreateObserver<int>();
var xs = ts.CreateColdObservable<int>(
new Recorded<Notification<int>>(1, Notification.CreateOnCompleted<int>())
);
xs.Subscribe(ob);
ts.AdvanceTo(2);
Assert.Equal(1, xs.Subscriptions.Single().Unsubscribe); //<-- Xunit no like
我最初怀疑观察者,但我在这里找到的代码变体上尝试过,它可以工作,所以现在我认为ColdObservable
上的Subscribe
实现行为不正确。
不存在这样的礼貌。第 4.3 节中的 RX 设计指南建议您可以:
假设资源在 OnError 或 OnDone 消息之后被清理。
在第 4.4 节中,您可以:
假设尽最大努力停止所有未完成的退订工作
这些准则("礼貌"(谈到运营商尽快释放自己的资源以及已获得的任何资源。
在代码中,你不会针对这两种方案进行测试。ITestableObservable
上的 Unsubscribe
属性的目的是报告观察者取出的订阅何时显式释放,而不是发生内部清理的时间 - 但你没有存储此句柄以便能够释放它:
xs.Subscribe(ob); /* return of handle ignored here */
因此,您试图断言您处置了丢弃的订阅,而不是您订阅的可观察量清理了它可能已取出的任何订阅和资源。
如果想看看 4.3/4.4 资源及时清理的效果,写一个像这样的扩展方法:
public static IObservable<T> SpyResourceCleanUp<T>(
this IObservable<T> source, IScheduler scheduler)
{
return Observable.Create<T>(obs =>
{
var subscription = source.Subscribe(obs);
return new CompositeDisposable(
subscription,
Disposable.Create(() => Console.WriteLine(
"Clean up performed at " + scheduler.Now.Ticks)));
});
}
并更换您的线路:
xs.Subscribe(ob);
跟
xs.SpyResourceCleanUp(ts).Subscribe(ob);
(部分评论编辑(
正如我所期望的那样,在您的测试中,我看到了立即的资源清理。通过此更改,您的测试现在将通过,因为只要 OnCompletes(( 本身遵守准则的 4.3,SpyResourceCleanUp
就会取消订阅它的父级 (xs(。
这里可能不明显的是,一旦释放订阅或在观察器上调用了OnComplete()
或OnError()
,Observable.Create
就会处理调用返回IDisposable
的Dispose()
方法。这就是Create
帮助您实现第 4.3 节的方式,以及测试通过更改后的代码的原因。
在幕后,Create
返回的AnonymousObservable<T> : ObservableBase<T>
的订阅由一个AutoDetachObserver
包装,如下所示。
即您从Observable.Create
返回的Disposable
不是调用者得到的 - 他们得到一个包装的版本,该版本将在流终止或取消时调用您的 Dispose((。