在寻找学习Rx的材料时,我发现了这个:反应性扩展(Rx) Koans。来自引言:
"公案"的定义
Kōans是一个禅宗词汇,意思是一个人的启蒙或觉醒,通常是通过谜题或谜语。最常见的问题是"一只手拍手的声音是什么?"
它由许多简短的测试用例组成,这些测试用例教授Rx的不同方面。
其中一个应该直观地通过,然而,它失败了。你能解释一下吗?
全文如下:
[TestMethod]
[Timeout(___)] //"Fill in the blanks" - I tried several values, e.g. 4000. No changes.
public void AsynchronousRunInParallel()
{
Func<int, int> inc = (int x) =>
{
// I set a breakpoint here and it's never hit.
Thread.Sleep(1500);
return x + 1;
};
double result = 0;
var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
inc.EndInvoke);
incAsync(1).Merge(incAsync(9)).Sum()
.SubscribeOn(Scheduler.Immediate)
.Subscribe(n => result = n);
Assert.AreEqual(12, result);
//the failing message says: 'expected 12, got 0'
}
简短回答:
在检查结果之前,您没有给测试的异步部分足够的时间来执行。
长答:
这个测试执行的操作顺序有点像:
- 设置异步调用委托的
IObservable
- 链接到另一个
IObservable
,这是两个异步调用的合并结果,加在一起 -
Subscribe
到结果IObservable
,导致对 方法的两次异步调用 - 哎!在委托中有一个
Thread.Sleep
,所以异步调用被阻塞了! - 立即检查结果,当然是0 -两个被阻塞的异步调用还没有"完成"
有很多方法可以"修复"这个问题:
- 删除
Thread.Sleep
- 通过更改
BeginInvoke
将调用更改为同步,尽管这将需要对测试进行整体重组 - 用
HistoricalScheduler
代替Immediate
强烈建议在尝试对Rx进行单元测试时使用HistoricalScheduler
-基本上,它允许您在虚拟时间中向前和向后跳跃,这是测试时间相关代码(如Rx查询)的关键特性:
var theTardis = new HistoricalScheduler();
Func<int, int> inc = (int x) =>
{
theTardis.Sleep(TimeSpan.FromMilliseconds(1500));
return x + 1;
};
double result = 0;
var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,inc.EndInvoke);
incAsync(1).Merge(incAsync(9)).Sum()
.SubscribeOn(theTardis)
.Subscribe(n => result = n);
// To the FUTURE!
theTardis.AdvanceBy(TimeSpan.FromSeconds(5));
Assert.AreEqual(12, result);
同步版本看起来是这样的——您所拥有的最直接的版本。Single()
将阻塞,直到可观察对象完成。阻塞通常是你想要避免的,但如果你只是在胡闹,那也没关系。
public void AsynchronousRunInParallel()
{
Func<int, int> inc = (int x) =>
{
Thread.Sleep(1500);
return x + 1;
};
var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
inc.EndInvoke);
int sum = incAsync(1).Merge(incAsync(9)).Sum().Single();
Assert.AreEqual(12, sum);
}
和异步TPL版本,使用await
:
public async Task AsynchronousRunInParallel()
{
Func<int, int> inc = (int x) =>
{
Thread.Sleep(1500);
return x + 1;
};
var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
inc.EndInvoke);
int sum = await incAsync(1).Merge(incAsync(9)).Sum();
Assert.AreEqual(12, sum);
}
最后是使用Rx Do()
的异步操作,如果这是更大操作的一部分,那就好了:
public async Task AsynchronousRunInParallel()
{
Func<int, int> inc = (int x) =>
{
Thread.Sleep(1500);
return x + 1;
};
var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
inc.EndInvoke);
await incAsync(1).Merge(incAsync(9)).Sum().Do(sum =>
{
Assert.AreEqual(12, sum);
});
}
在我看来,这是源代码中的一个bug,而AsynchronousRunInParallel缺少等待结果,例如在thebloodyhardasyncinvokationpattern中使用。
当使用FromAsyncPattern时,只有BeginInvoke在当前线程中同步执行。实际的工作和结果处理将被安排在ThreadPool上。因此,只要安排了2个异步增量,就会立即开始执行Assert,而不会等待完成。
我说:
ThreadUtils.WaitUntil(() => result != 0.0);
所以结果看起来像:
incAsync(1).Merge(incAsync(9)).
Sum().SubscribeOn(Scheduler.Immediate).
Subscribe(n => result = n);
ThreadUtils.WaitUntil(() => result != 0.0);
Assert.AreEqual(12, result);
或者您可以将"订阅"替换为"运行"。它是Koan本身的一个助手方法,它将使用手动事件等待:
incAsync(1).Merge(incAsync(9)).
Sum().SubscribeOn(Scheduler.Immediate).
Run(n => result = n);
Assert.AreEqual(12, result);