我正在用下面的简化测试用例重现我的 Rx 问题。下面的测试挂起。我确信这是一个很小但基本的东西,我错过了,但不能把我的手指放在上面。
public class Service
{
private ISubject<double> _subject = new Subject<double>();
public void Reset()
{
_subject.OnNext(0.0);
}
public IObservable<double> GetProgress()
{
return _subject;
}
}
public class ObTest
{
[Fact]
private async Task SimpleTest()
{
var service = new Service();
var result = service.GetProgress().Take(1);
var task = Task.Run(async () =>
{
service.Reset();
});
await result;
}
}
更新
我上面的尝试是稍微简化问题并理解它。就我而言GetProgress()
是发布下载进度的各种Observables
的合并,其中一个Observables
是每次有人调用方法来删除下载时都会发布0
Subject<double>
。
Enigmativity和Theodor Zoulias确定的竞争条件可能发生在现实生活中。我显示一个试图获得进度的视图,但是,快速手指及时删除了它。
我需要了解更多的是,如果下载再次开始(订阅现在已经发生,通过显示一个视图,已经进行了订阅(,并且有人再次删除它。
public class Service
{
private ISubject<double> _deleteSubject = new Subject<double>();
public void Reset()
{
_deleteSubject.OnNext(0.0);
}
public IObservable<double> GetProgress()
{
return _deleteSubject.Merge(downloadProgress);
}
}
您的代码没有挂起。它正在等待一个有时永远不会获得值的可观察量。
您有竞争条件。
Task.Run
有时会在await result
创建对可观察对象的订阅之前执行完成 - 因此它永远不会看到该值。
请尝试以下代码:
private async Task SimpleTest()
{
var service = new Service();
var result = service.GetProgress().Take(1);
var awaiter = result.GetAwaiter();
var task = Task.Run(() =>
{
service.Reset();
});
await awaiter;
}
行await result
创建对可观察量的订阅。问题是通知_subject.OnNext(0.0)
可能发生在此订阅之前,在这种情况下,该值将在未观察到的情况下传递,并且await result
将永远继续等待通知。在这个特定示例中,通知总是被遗漏,至少在我的 PC 中是这样,因为订阅延迟了大约 30 毫秒(用Stopwatch
测量(,这比重置服务的任务完成所需的时间要长,可能是因为 JITer 必须加载和编译一些与 RX 相关的程序集。当我在运行示例之前通过调用new Subject<int>().FirstAsync().Subscribe()
进行预热时,情况会发生变化。在这种情况下,几乎总是会遵守通知,并避免挂起。
我可以想到这个问题的两个强有力的解决方案。
-
Enigmativity 建议的解决方案是在启动重置服务的任务之前创建可等待的订阅。这可以通过
GetAwaiter
或ToTask
来完成。 -
使用
ReplaySubject<T>
而不是普通的香草Subject<T>
.
表示既是可观察序列又是观察者的对象。每个通知都会广播给所有订阅和未来的观察者,但须遵守缓冲区修整策略。
ReplaySubject
将缓存该值,以便将来的订阅可以观察该值,从而消除争用条件。可以使用 1bufferSize
对其进行初始化,以最大程度地减少缓冲区的内存占用。