RX:测试样本间隔开关管道



我有一些RX代码来测试事件流上的不活动,流上的活动重置了一个间隔,这是不活动触发器。

public interface IReportActivity
{
    event EventHandler Activity;
}
public interface IInactivityMonitor
{
    IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout);
}
public class InactivityMonitor : IInactivityMonitor
{
    private readonly ISchedulerProvider _schedulerProvider;
    private readonly IReportActivity _activitySource;
    private IObservable<Unit> _inactivityObservable;
    public InactivityMonitor(IRaiseActivity activitySource, ISchedulerProvider schedulerProvider)
    {
        _activitySource = activitySource;
        _schedulerProvider = schedulerProvider;
    }
    public IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout)
    {
        return GetInactivityObservable()
            .Select(_ => Observable.Interval(inactivityTimeout, _schedulerProvider.NewThread)
                    .Timestamp()
                    .Select(__ => Unit.Default))
            .Switch();
    }
    public IObservable<Unit> GetInactivityObservable()
    {
        return _inactivityObservable = _inactivityObservable ??
            Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
                h => _activitySource.Activity += h,
                h => _activitySource.Activity -= h)
                .Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
                .Select(_ => Unit.Default)
                .Publish()
                .RefCount();
    }
}

代码按要求工作(尽管我认为每秒重新创建间隔是超过kill - ObserveInactivity应该在超时之前采样,并根据最后一个活动的时间戳重置)

我真正的问题是试着测试这段代码。

[TestFixture]
public class InactivityMonitorTests
{
    private TestSchedulers _testSchedulers;
    private InactivityMonitor _sut;
    private AutoMock _moqqer;
    protected override void Setup()
    {
        base.Setup();
        _moqqer = new AutoMock()
        _testSchedulers = new TestSchedulers();
        _moqqer.Use<ISchedulerProvider>(_testSchedulers);
        _sut = Moqqer.CreateInstance<InactivityMonitor>();
    }
    // this test passes
    [Test]
    public void GetInactivityObservable_ActivityDetected_ReportsActivity()
    {
        var activityObserved = false;
        _sut.GetInactivityObservable()
           .Subscribe(x => activityObserved = true);
        RaiseActivityEvent();
        _testSchedulers.NewThread.AdvanceBy(TimeSpan.FromSeconds(11).Ticks);
        activityObserved.Should().BeTrue();
    }
    private void RaiseActivityEvent()
    {
        _moqqer.GetMock<IReportActivty>()
          .Raise(m => m.Activity += null, EventArgs.Empty);
    }
    // this test fails, The interval never appears to get set up via the tests.
    [Test]
    public void ObserveActivity_InactivtyTimeoutExceeded_NotificationReceived()
    {
       var inactivityObserved = false;
        _sut.ObserveInactivity(TimeSpan.FromSeconds(10))
           .Subscribe(x => inactivityObserved = true);
       _testSchedulers.NewThread.AdvanceBy(TimeSpan.FromSeconds(11).Ticks);
       inactivityObserved.Should().BeTrue();
    }
}
public interface ISchedulerProvider
{
    IScheduler CurrentThread { get; }
    IScheduler Dispatcher { get; }
    IScheduler Immediate { get; }
    IScheduler NewThread { get; }
    IScheduler ThreadPool { get; }
    IScheduler TaskPool { get; } 
}
public sealed class TestSchedulers : ISchedulerProvider
{
    private readonly TestScheduler _currentThread = new TestScheduler();
    private readonly TestScheduler _dispatcher = new TestScheduler();
    private readonly TestScheduler _immediate = new TestScheduler();
    private readonly TestScheduler _newThread = new TestScheduler();
    private readonly TestScheduler _threadPool = new TestScheduler();
    private readonly TestScheduler _taskPool = new TestScheduler();
    #region Explicit implementation of ISchedulerService
    IScheduler ISchedulerProvider.CurrentThread { get { return _currentThread; } }
    IScheduler ISchedulerProvider.Dispatcher { get { return _dispatcher; } }
    IScheduler ISchedulerProvider.Immediate { get { return _immediate; } }
    IScheduler ISchedulerProvider.NewThread { get { return _newThread; } }
    IScheduler ISchedulerProvider.ThreadPool { get { return _threadPool; } }
    IScheduler ISchedulerProvider.TaskPool { get { return _taskPool; } }
    #endregion
    public TestScheduler CurrentThread { get { return _currentThread; } }
    public TestScheduler Dispatcher { get { return _dispatcher; } }
    public TestScheduler Immediate { get { return _immediate; } }
    public TestScheduler NewThread { get { return _newThread; } }
    public TestScheduler ThreadPool { get { return _threadPool; } }
    public TestScheduler TaskPool { get { return _taskPool; } }
}

我尝试了各种方法来启动调度程序,在订阅之前和之后,尝试在另一个调度程序上订阅并在订阅之前启动,但没有成功。

编辑:调试显示间隔从未在ObserveInactivity方法中创建。

希望有人能给我指个方向。

谢谢

我会重新组织您的非活动查询,以便不是从_activitySource.Activity触发,然后添加时间,而是期望在XXX期间失败/沉默并为此发出通知。然后升级该查询,以便在有任何活动时取消/重新启动。

。而不是

public IObservable<Unit> GetInactivityObservable()
{
    return _inactivityObservable = _inactivityObservable ??
        Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
            h => _activitySource.Activity += h,
            h => _activitySource.Activity -= h)
            .Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
            .Select(_ => Unit.Default)
            .Publish()
            .RefCount();
}

改成

public IObservable<Unit> GetInactivityObservable()
{
    return _inactivityObservable = _inactivityObservable ??
        Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
            h => _activitySource.Activity += h,
            h => _activitySource.Activity -= h)
            .Select(_=>Unit.Default)
            .StartWith(Unit.Default)
            .Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
            .Select(a=>Observable.Timer(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread).Select(_=>Unit.Default))
            .Switch()
            .Publish()
            .RefCount();
}

然后,由于Rx已经被惰性求值了,我将这个方法设为静态且无副作用,并在构造函数中调用它,或者只是内联它。

public class InactivityMonitor : IInactivityMonitor
{
    private readonly ISchedulerProvider _schedulerProvider;
    private readonly IObservable<Unit> _inactivityObservable;
    private static readonly TimeSpan SilencePeriod = TimeSpan.FromSeconds(1);
public InactivityMonitor(IRaiseActivity activitySource, ISchedulerProvider schedulerProvider)
{
    _schedulerProvider = schedulerProvider;
    _inactivityObservable = Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
            h => _activitySource.Activity += h,
            h => _activitySource.Activity -= h)
            .StartWith(null)
            .Select(a=>Observable.Timer(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread))
            .Switch()
            .Select(_=>Unit.Default)
            .Publish()
            .RefCount();
}
//...

}

所以,有机会回到这个问题,我看到了错误在我的方式。

可观测。Interval从未被启动,因为没有活动被报告(由错误命名的inactivity可观察到)。运行应用程序很可能至少会引发1个事件。为了解决这个问题,我添加了.StartWith(Unit.Default),如下所示,这允许每个观察者立即启动超时,这显然是原始实现中的一个潜在错误。

public IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout)
{
    return GetInactivityObservable()
        .StartWith(Unit.Default)  // <--
        .Select(_ => Observable.Interval(inactivityTimeout, _schedulerProvider.NewThread)
                .Timestamp()
                .Select(__ => Unit.Default))
        .Switch();
}

相关内容

  • 没有找到相关文章