如何防止通过 Rx 测试挂起?



我正在用下面的简化测试用例重现我的 Rx 问题。下面的测试挂起。我确信这是一个很小但基本的东西,我错过了,但不能把我的手指放在上面。

public class Service
{
private ISubject<double> _subject = new Subject<double>();
public void Reset()
{
_subject.OnNext(0.0);
}
public IObservable<double> GetProgress()
{
return _subject;
}
}
public class ObTest
{
[Fact]
private async Task SimpleTest()
{
var service = new Service();
var result = service.GetProgress().Take(1);
var task = Task.Run(async () =>
{
service.Reset();
});
await result;
}
}

更新

我上面的尝试是稍微简化问题并理解它。就我而言GetProgress()是发布下载进度的各种Observables的合并,其中一个Observables是每次有人调用方法来删除下载时都会发布0Subject<double>

EnigmativityTheodor Zoulias确定的竞争条件可能发生在现实生活中。我显示一个试图获得进度的视图,但是,快速手指及时删除了它。

我需要了解更多的是,如果下载再次开始(订阅现在已经发生,通过显示一个视图,已经进行了订阅(,并且有人再次删除它。

public class Service
{
private ISubject<double> _deleteSubject = new Subject<double>();
public void Reset()
{
_deleteSubject.OnNext(0.0);
}
public IObservable<double> GetProgress()
{
return _deleteSubject.Merge(downloadProgress);
}
}

您的代码没有挂起。它正在等待一个有时永远不会获得值的可观察量。

您有竞争条件。

Task.Run有时会在await result创建对可观察对象的订阅之前执行完成 - 因此它永远不会看到该值。

请尝试以下代码:

private async Task SimpleTest()
{
var service = new Service();
var result = service.GetProgress().Take(1);
var awaiter = result.GetAwaiter();
var task = Task.Run(() =>
{
service.Reset();
});
await awaiter;
}

await result创建对可观察量的订阅。问题是通知_subject.OnNext(0.0)可能发生在此订阅之前,在这种情况下,该值将在未观察到的情况下传递,并且await result将永远继续等待通知。在这个特定示例中,通知总是被遗漏,至少在我的 PC 中是这样,因为订阅延迟了大约 30 毫秒(用Stopwatch测量(,这比重置服务的任务完成所需的时间要长,可能是因为 JITer 必须加载和编译一些与 RX 相关的程序集。当我在运行示例之前通过调用new Subject<int>().FirstAsync().Subscribe()进行预热时,情况会发生变化。在这种情况下,几乎总是会遵守通知,并避免挂起。

我可以想到这个问题的两个强有力的解决方案。

  1. Enigmativity 建议的解决方案是在启动重置服务的任务之前创建可等待的订阅。这可以通过GetAwaiterToTask来完成。

  2. 使用ReplaySubject<T>而不是普通的香草Subject<T>.

表示既是可观察序列又是观察者的对象。每个通知都会广播给所有订阅和未来的观察者,但须遵守缓冲区修整策略。

ReplaySubject将缓存该值,以便将来的订阅可以观察该值,从而消除争用条件。可以使用 1bufferSize对其进行初始化,以最大程度地减少缓冲区的内存占用。

相关内容

  • 没有找到相关文章

最新更新