使用 TestSchedulers、Rx 和 BlockingCollection 进行死锁测试



我有以下类,它基本上订阅了一个int可观察量并将值乘以 2。出于现实目的,我添加了一个 Thread.Sleep 来模拟繁重的处理。

public class WorkingClass
{
private BlockingCollection<int> _collection = new BlockingCollection<int>(1);
public WorkingClass(IObservable<int> rawValues)
{
rawValues.Subscribe(x => _collection.Add(x));
}
public IObservable<int> ProcessedValues()
{
return Observable.Create<int>(observer =>
{
while (true)
{
int value;
try
{
value = _collection.Take();
}
catch (Exception ex)
{
observer.OnError(ex);
break;
}
Thread.Sleep(1000); //Simulate long work
observer.OnNext(value * 2);
}
return Disposable.Empty;
});
}
}

我在测试它时遇到了问题,在下面的测试中,我只想断言,如果源流发出值 1,SUT 将发出值 2:

[Test]
public void SimpleTest()
{
var sourceValuesScheduler = new TestScheduler();
var newThreadScheduler = new TestScheduler();
var source = sourceValuesScheduler.CreateHotObservable(
new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));
var sut = new WorkingClass(source);
var observer = sourceValuesScheduler.CreateObserver<int>();
sut.ProcessedValues()
.SubscribeOn(newThreadScheduler) //The cold part (i.e, the while loop) of the ProcessedValues Observable should run in a different thread
.Subscribe(observer);
sourceValuesScheduler.AdvanceTo(1000);
observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));
}

如果我运行此测试,断言将失败,因为 newThreadScheduler 从未启动,因此从未创建 ProcessingValues observable。如果我这样做:

sourceValuesScheduler.AdvanceTo(1000);
newThreadScheduler.AdvanceTo(1000); 

它也不起作用,因为newThreadScheduler使用与sourceValuesScheduler相同的线程,因此测试将在发出处理后的值后立即挂起,在以下行:

value = _collection.Take();

有没有办法让多个测试调度程序在不同的线程上运行?否则我怎么能测试这样的类呢?

Take()

块,直到有项目要从BlockingCollection<int>中删除或您调用CompleteAdding()

鉴于您当前的实现,您订阅ProcessedValues()并执行while循环的线程将永远不会完成。

您应该在单独的线程上使用BlockingCollection<int>。例如,您可以在调用ProcessedValues()时创建消费Task。考虑以下实现,它也释放了BlockingCollection<int>

public sealed class WorkingClass : IDisposable
{
private BlockingCollection<int> _collection = new BlockingCollection<int>(1);
private List<Task> _consumerTasks = new List<Task>();
public WorkingClass(IObservable<int> rawValues)
{
rawValues.Subscribe(x => _collection.Add(x));
}
public IObservable<int> ProcessedValues()
{
return Observable.Create<int>(observer =>
{
_consumerTasks.Add(Task.Factory.StartNew(() => Consume(observer), TaskCreationOptions.LongRunning));
return Disposable.Empty;
});
}
private void Consume(IObserver<int> observer)
{
try
{
foreach (int value in _collection.GetConsumingEnumerable())
{
Thread.Sleep(1000); //Simulate long work
observer.OnNext(value * 2);
}
}
catch (Exception ex)
{
observer.OnError(ex);
}
}
public void Dispose()
{
_collection.CompleteAdding();
Task.WaitAll(_consumerTasks.ToArray());
_collection.Dispose();
}
}

可以像使用以下代码一样对其进行测试:

var sourceValuesScheduler = new TestScheduler();
var source = sourceValuesScheduler.CreateHotObservable(
new Recorded<Notification<int>>(1000, Notification.CreateOnNext(1)));
var observer = sourceValuesScheduler.CreateObserver<int>();
using (var sut = new WorkingClass(source))
{
sourceValuesScheduler.AdvanceTo(1000); //add to collection
sut.ProcessedValues().Subscribe(observer); //consume
} //...and wait until the loop exists
observer.Messages.AssertEqual(new Recorded<Notification<int>>(1000, Notification.CreateOnNext(2)));

相关内容

  • 没有找到相关文章

最新更新