我有一些RX代码来测试事件流上的不活动,流上的活动重置了一个间隔,这是不活动触发器。
public interface IReportActivity
{
event EventHandler Activity;
}
public interface IInactivityMonitor
{
IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout);
}
public class InactivityMonitor : IInactivityMonitor
{
private readonly ISchedulerProvider _schedulerProvider;
private readonly IReportActivity _activitySource;
private IObservable<Unit> _inactivityObservable;
public InactivityMonitor(IRaiseActivity activitySource, ISchedulerProvider schedulerProvider)
{
_activitySource = activitySource;
_schedulerProvider = schedulerProvider;
}
public IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout)
{
return GetInactivityObservable()
.Select(_ => Observable.Interval(inactivityTimeout, _schedulerProvider.NewThread)
.Timestamp()
.Select(__ => Unit.Default))
.Switch();
}
public IObservable<Unit> GetInactivityObservable()
{
return _inactivityObservable = _inactivityObservable ??
Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
h => _activitySource.Activity += h,
h => _activitySource.Activity -= h)
.Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
.Select(_ => Unit.Default)
.Publish()
.RefCount();
}
}
代码按要求工作(尽管我认为每秒重新创建间隔是超过kill - ObserveInactivity应该在超时之前采样,并根据最后一个活动的时间戳重置)
我真正的问题是试着测试这段代码。
[TestFixture]
public class InactivityMonitorTests
{
private TestSchedulers _testSchedulers;
private InactivityMonitor _sut;
private AutoMock _moqqer;
protected override void Setup()
{
base.Setup();
_moqqer = new AutoMock()
_testSchedulers = new TestSchedulers();
_moqqer.Use<ISchedulerProvider>(_testSchedulers);
_sut = Moqqer.CreateInstance<InactivityMonitor>();
}
// this test passes
[Test]
public void GetInactivityObservable_ActivityDetected_ReportsActivity()
{
var activityObserved = false;
_sut.GetInactivityObservable()
.Subscribe(x => activityObserved = true);
RaiseActivityEvent();
_testSchedulers.NewThread.AdvanceBy(TimeSpan.FromSeconds(11).Ticks);
activityObserved.Should().BeTrue();
}
private void RaiseActivityEvent()
{
_moqqer.GetMock<IReportActivty>()
.Raise(m => m.Activity += null, EventArgs.Empty);
}
// this test fails, The interval never appears to get set up via the tests.
[Test]
public void ObserveActivity_InactivtyTimeoutExceeded_NotificationReceived()
{
var inactivityObserved = false;
_sut.ObserveInactivity(TimeSpan.FromSeconds(10))
.Subscribe(x => inactivityObserved = true);
_testSchedulers.NewThread.AdvanceBy(TimeSpan.FromSeconds(11).Ticks);
inactivityObserved.Should().BeTrue();
}
}
public interface ISchedulerProvider
{
IScheduler CurrentThread { get; }
IScheduler Dispatcher { get; }
IScheduler Immediate { get; }
IScheduler NewThread { get; }
IScheduler ThreadPool { get; }
IScheduler TaskPool { get; }
}
public sealed class TestSchedulers : ISchedulerProvider
{
private readonly TestScheduler _currentThread = new TestScheduler();
private readonly TestScheduler _dispatcher = new TestScheduler();
private readonly TestScheduler _immediate = new TestScheduler();
private readonly TestScheduler _newThread = new TestScheduler();
private readonly TestScheduler _threadPool = new TestScheduler();
private readonly TestScheduler _taskPool = new TestScheduler();
#region Explicit implementation of ISchedulerService
IScheduler ISchedulerProvider.CurrentThread { get { return _currentThread; } }
IScheduler ISchedulerProvider.Dispatcher { get { return _dispatcher; } }
IScheduler ISchedulerProvider.Immediate { get { return _immediate; } }
IScheduler ISchedulerProvider.NewThread { get { return _newThread; } }
IScheduler ISchedulerProvider.ThreadPool { get { return _threadPool; } }
IScheduler ISchedulerProvider.TaskPool { get { return _taskPool; } }
#endregion
public TestScheduler CurrentThread { get { return _currentThread; } }
public TestScheduler Dispatcher { get { return _dispatcher; } }
public TestScheduler Immediate { get { return _immediate; } }
public TestScheduler NewThread { get { return _newThread; } }
public TestScheduler ThreadPool { get { return _threadPool; } }
public TestScheduler TaskPool { get { return _taskPool; } }
}
我尝试了各种方法来启动调度程序,在订阅之前和之后,尝试在另一个调度程序上订阅并在订阅之前启动,但没有成功。
编辑:调试显示间隔从未在ObserveInactivity方法中创建。
希望有人能给我指个方向。
谢谢
我会重新组织您的非活动查询,以便不是从_activitySource.Activity
触发,然后添加时间,而是期望在XXX期间失败/沉默并为此发出通知。然后升级该查询,以便在有任何活动时取消/重新启动。
。而不是
public IObservable<Unit> GetInactivityObservable()
{
return _inactivityObservable = _inactivityObservable ??
Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
h => _activitySource.Activity += h,
h => _activitySource.Activity -= h)
.Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
.Select(_ => Unit.Default)
.Publish()
.RefCount();
}
改成
public IObservable<Unit> GetInactivityObservable()
{
return _inactivityObservable = _inactivityObservable ??
Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
h => _activitySource.Activity += h,
h => _activitySource.Activity -= h)
.Select(_=>Unit.Default)
.StartWith(Unit.Default)
.Sample(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread)
.Select(a=>Observable.Timer(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread).Select(_=>Unit.Default))
.Switch()
.Publish()
.RefCount();
}
然后,由于Rx已经被惰性求值了,我将这个方法设为静态且无副作用,并在构造函数中调用它,或者只是内联它。
public class InactivityMonitor : IInactivityMonitor
{
private readonly ISchedulerProvider _schedulerProvider;
private readonly IObservable<Unit> _inactivityObservable;
private static readonly TimeSpan SilencePeriod = TimeSpan.FromSeconds(1);
public InactivityMonitor(IRaiseActivity activitySource, ISchedulerProvider schedulerProvider)
{
_schedulerProvider = schedulerProvider;
_inactivityObservable = Observable.FromEventPattern<EventHandler<EventArgs>, EventArgs>(
h => _activitySource.Activity += h,
h => _activitySource.Activity -= h)
.StartWith(null)
.Select(a=>Observable.Timer(TimeSpan.FromSeconds(1), _schedulerProvider.NewThread))
.Switch()
.Select(_=>Unit.Default)
.Publish()
.RefCount();
}
//...
}
所以,有机会回到这个问题,我看到了错误在我的方式。
可观测。Interval从未被启动,因为没有活动被报告(由错误命名的inactivity可观察到)。运行应用程序很可能至少会引发1个事件。为了解决这个问题,我添加了.StartWith(Unit.Default)
,如下所示,这允许每个观察者立即启动超时,这显然是原始实现中的一个潜在错误。
public IObservable<Unit> ObserveInactivity(TimeSpan inactivityTimeout)
{
return GetInactivityObservable()
.StartWith(Unit.Default) // <--
.Select(_ => Observable.Interval(inactivityTimeout, _schedulerProvider.NewThread)
.Timestamp()
.Select(__ => Unit.Default))
.Switch();
}