TestScheduler 创建的 ColdObserver 在 OnDone 时不会取消订阅



我想检查一下我创建的IObservable是否尊重"一旦我完成,我将取消订阅你">的礼貌。乍一看,我的代码似乎出了点问题。但是消除我的代码,并且仅使用TestScheduler提供的Observable和Observer,看起来"取消订阅"永远不会发生:

using Microsoft.Reactive.Testing;
using System.Reactive;
...
var ts = new TestScheduler();
var ob = ts.CreateObserver<int>();
var xs = ts.CreateColdObservable<int>(
    new Recorded<Notification<int>>(1, Notification.CreateOnCompleted<int>())
    );
xs.Subscribe(ob);
ts.AdvanceTo(2);
Assert.Equal(1, xs.Subscriptions.Single().Unsubscribe); //<-- Xunit no like

我最初怀疑观察者,但我在这里找到的代码变体上尝试过,它可以工作,所以现在我认为ColdObservable上的Subscribe实现行为不正确。

不存在这样的礼貌。第 4.3 节中的 RX 设计指南建议您可以:

假设资源在 OnErrorOnDone 消息之后被清理。

在第 4.4 节中,您可以:

假设尽最大努力停止所有未完成的退订工作

这些准则("礼貌"(谈到运营商尽快释放自己的资源以及已获得的任何资源。

在代码中,你不会针对这两种方案进行测试。ITestableObservable上的 Unsubscribe 属性的目的是报告观察者取出的订阅何时显式释放,而不是发生内部清理的时间 - 但你没有存储此句柄以便能够释放它:

xs.Subscribe(ob); /* return of handle ignored here */
因此,您

试图断言您处置了丢弃的订阅,而不是您订阅的可观察量清理了它可能已取出的任何订阅和资源。

如果想看看 4.3/4.4 资源及时清理的效果,写一个像这样的扩展方法:

public static IObservable<T> SpyResourceCleanUp<T>(
    this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(obs =>
    {
        var subscription = source.Subscribe(obs);
        return new CompositeDisposable(
            subscription,
            Disposable.Create(() => Console.WriteLine(
                "Clean up performed at " + scheduler.Now.Ticks)));
    });
}

并更换您的线路:

xs.Subscribe(ob);

xs.SpyResourceCleanUp(ts).Subscribe(ob);

(部分评论编辑(

正如我所期望的那样,在您的测试中,我看到了立即的资源清理。通过此更改,您的测试现在将通过,因为只要 OnCompletes(( 本身遵守准则的 4.3,SpyResourceCleanUp就会取消订阅它的父级 (xs(。

这里可能不明显的是,一旦释放订阅或在观察器上调用了OnComplete()OnError()Observable.Create就会处理调用返回IDisposableDispose()方法。这就是Create帮助您实现第 4.3 节的方式,以及测试通过更改后的代码的原因。

在幕后,Create返回的AnonymousObservable<T> : ObservableBase<T>的订阅由一个AutoDetachObserver包装,如下所示。

即您从Observable.Create返回的Disposable不是调用者得到的 - 他们得到一个包装的版本,该版本将在流终止或取消时调用您的 Dispose((。

相关内容

  • 没有找到相关文章

最新更新