. net Rx Koans:为什么这个测试用例失败了?



在寻找学习Rx的材料时,我发现了这个:反应性扩展(Rx) Koans。来自引言:

"公案"的定义
Kōans是一个禅宗词汇,意思是一个人的启蒙或觉醒,通常是通过谜题或谜语。最常见的问题是"一只手拍手的声音是什么?"

它由许多简短的测试用例组成,这些测试用例教授Rx的不同方面。

其中一个应该直观地通过,然而,它失败了。你能解释一下吗?

全文如下:

    [TestMethod]
    [Timeout(___)] //"Fill in the blanks" - I tried several values, e.g. 4000. No changes.
    public void AsynchronousRunInParallel()
    {
        Func<int, int> inc = (int x) =>
                                {
                                    // I set a breakpoint here and it's never hit.
                                    Thread.Sleep(1500);
                                    return x + 1;
                                };
        double result = 0;
        var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
                                                             inc.EndInvoke);
        incAsync(1).Merge(incAsync(9)).Sum()
                               .SubscribeOn(Scheduler.Immediate)
                               .Subscribe(n => result = n);
        Assert.AreEqual(12, result);
                    //the failing message says: 'expected 12, got 0'
    }

简短回答:

在检查结果之前,您没有给测试的异步部分足够的时间来执行。

长答:

这个测试执行的操作顺序有点像:

  • 设置异步调用委托的IObservable
  • 链接到另一个IObservable,这是两个异步调用的合并结果,加在一起
  • Subscribe到结果IObservable,导致对
  • 方法的两次异步调用
  • 哎!在委托中有一个Thread.Sleep,所以异步调用被阻塞了!
  • 立即检查结果,当然是0 -两个被阻塞的异步调用还没有"完成"

有很多方法可以"修复"这个问题:

  • 删除Thread.Sleep
  • 通过更改BeginInvoke将调用更改为同步,尽管这将需要对测试进行整体重组
  • HistoricalScheduler代替Immediate

强烈建议在尝试对Rx进行单元测试时使用HistoricalScheduler -基本上,它允许您在虚拟时间中向前和向后跳跃,这是测试时间相关代码(如Rx查询)的关键特性:

var theTardis = new HistoricalScheduler();
Func<int, int> inc = (int x) =>
{
    theTardis.Sleep(TimeSpan.FromMilliseconds(1500));
    return x + 1;
};
double result = 0;
var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,inc.EndInvoke);
incAsync(1).Merge(incAsync(9)).Sum()
    .SubscribeOn(theTardis)
    .Subscribe(n => result = n);
// To the FUTURE!
theTardis.AdvanceBy(TimeSpan.FromSeconds(5));
Assert.AreEqual(12, result);

同步版本看起来是这样的——您所拥有的最直接的版本。Single()将阻塞,直到可观察对象完成。阻塞通常是你想要避免的,但如果你只是在胡闹,那也没关系。

public void AsynchronousRunInParallel()
{
    Func<int, int> inc = (int x) =>
    {
        Thread.Sleep(1500);
        return x + 1;
    };
    var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
                                                         inc.EndInvoke);
    int sum = incAsync(1).Merge(incAsync(9)).Sum().Single();
    Assert.AreEqual(12, sum);
}

和异步TPL版本,使用await:

public async Task AsynchronousRunInParallel()
{
    Func<int, int> inc = (int x) =>
    {
        Thread.Sleep(1500);
        return x + 1;
    };
    var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
                                                         inc.EndInvoke);
    int sum = await incAsync(1).Merge(incAsync(9)).Sum();
    Assert.AreEqual(12, sum);
}

最后是使用Rx Do()的异步操作,如果这是更大操作的一部分,那就好了:

public async Task AsynchronousRunInParallel()
{
    Func<int, int> inc = (int x) =>
    {
        Thread.Sleep(1500);
        return x + 1;
    };
    var incAsync = Observable.FromAsyncPattern<int, int>(inc.BeginInvoke,
                                                         inc.EndInvoke);
    await incAsync(1).Merge(incAsync(9)).Sum().Do(sum =>
    {
        Assert.AreEqual(12, sum);
    });
}

在我看来,这是源代码中的一个bug,而AsynchronousRunInParallel缺少等待结果,例如在thebloodyhardasyncinvokationpattern中使用。

当使用FromAsyncPattern时,只有BeginInvoke在当前线程中同步执行。实际的工作和结果处理将被安排在ThreadPool上。因此,只要安排了2个异步增量,就会立即开始执行Assert,而不会等待完成。

我说:

ThreadUtils.WaitUntil(() => result != 0.0);

所以结果看起来像:

        incAsync(1).Merge(incAsync(9)).
            Sum().SubscribeOn(Scheduler.Immediate).
            Subscribe(n => result = n);
        ThreadUtils.WaitUntil(() => result != 0.0);
        Assert.AreEqual(12, result);

或者您可以将"订阅"替换为"运行"。它是Koan本身的一个助手方法,它将使用手动事件等待:

incAsync(1).Merge(incAsync(9)).
             Sum().SubscribeOn(Scheduler.Immediate).
             Run(n => result = n);
Assert.AreEqual(12, result);

相关内容

  • 没有找到相关文章

最新更新