给定以下方法:
如果我将黑客hack放在原位,我的单位测试将立即使用"可观察的没有数据"完成。
如果我取出hack,则有多个线程都尝试同时登录。
主机服务不允许此。
我如何确保在任何给定的时间点都只能生产一个线程是生产。
private static object obj = new object();
private static bool here = true;
public IObservable<Party> LoadAllParties(CancellationToken token)
{
var parties = Observable.Create<Party>(
async (observer, cancel) =>
{
// this is just a hack to test behavior
lock (obj)
{
if (!here)
return;
here = false;
}
// end of hack.
try
{
if (!await this.RequestLogin(observer, cancel))
return;
// request list.
await this._request.GetAsync(this._configuration.Url.RequestList);
if (this.IsCancelled(observer, cancel))
return;
while (!cancel.IsCancellationRequested)
{
var entities = await this._request.GetAsync(this._configuration.Url.ProcessList);
if (this.IsCancelled(observer, cancel))
return;
var tranche = this.ExtractParties(entities);
// break out if it's the last page.
if (!tranche.Any())
break;
Array.ForEach(tranche, observer.OnNext);
await this._request.GetAsync(this._configuration.Url.ProceedList);
if (this.IsCancelled(observer, cancel))
return;
}
observer.OnCompleted();
}
catch (Exception ex)
{
observer.OnError(ex);
}
});
return parties;
}
我的单位测试:
var sut = container.Resolve<SyncDataManager>();
var count = 0;
var token = new CancellationTokenSource();
var observable = sut.LoadAllParties(token.Token);
observable.Subscribe(party => count++);
await observable.ToTask(token.Token);
count.Should().BeGreaterThan(0);
我确实认为您的问题遇到了XY问题 - 该代码包含几个未包括的方法,其中可能包含重要的副作用,我觉得可以使用可用的信息不会导致最佳建议。
也就是说,我怀疑您不打算两次订阅observable
- 一次使用明确的Subscribe
呼叫,然后一次使用ToTask()
调用。这肯定会解释并发电话,这些呼叫发生在两个不同的订阅中。
编辑:
宣称长度如何(调整适合的超时):
var length = await observable.Count().Timeout(TimeSpan.FromSeconds(3));
最好是研究RX测试并模拟您的依赖性。这是一个很大的话题,但是RX团队的这篇长博客文章很好地解释了这一点,有关TPL-RX相互作用的答案可能会有所帮助:在反应性管道中执行TPL代码,并通过测试调度程序控制执行程序