在C#中可观察到的非重新入侵



给定以下方法:

如果我将黑客hack放在原位,我的单位测试将立即使用"可观察的没有数据"完成。

如果我取出hack,则有多个线程都尝试同时登录。
主机服务不允许此

我如何确保在任何给定的时间点都只能生产一个线程生产

    private static object obj = new object();
    private static bool here = true;
    public IObservable<Party> LoadAllParties(CancellationToken token)
    {
        var parties = Observable.Create<Party>(
            async (observer, cancel) =>
            {
                // this is just a hack to test behavior
                lock (obj)
                {
                    if (!here)
                        return;
                    here = false;
                }
                // end of hack.
                try
                {
                    if (!await this.RequestLogin(observer, cancel))
                        return;
                    // request list.
                    await this._request.GetAsync(this._configuration.Url.RequestList);
                    if (this.IsCancelled(observer, cancel))
                        return;
                    while (!cancel.IsCancellationRequested)
                    {
                        var entities = await this._request.GetAsync(this._configuration.Url.ProcessList);
                        if (this.IsCancelled(observer, cancel))
                            return;
                        var tranche = this.ExtractParties(entities);
                        // break out if it's the last page.
                        if (!tranche.Any())
                            break;
                        Array.ForEach(tranche, observer.OnNext);
                        await this._request.GetAsync(this._configuration.Url.ProceedList);
                        if (this.IsCancelled(observer, cancel))
                            return;
                    }
                    observer.OnCompleted();
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }
            });
        return parties;
    }

我的单位测试:

var sut = container.Resolve<SyncDataManager>();
var count = 0;
var token = new CancellationTokenSource();
var observable = sut.LoadAllParties(token.Token);
observable.Subscribe(party => count++);
await observable.ToTask(token.Token);
count.Should().BeGreaterThan(0);

我确实认为您的问题遇到了XY问题 - 该代码包含几个未包括的方法,其中可能包含重要的副作用,我觉得可以使用可用的信息不会导致最佳建议。

也就是说,我怀疑您不打算两次订阅observable - 一次使用明确的Subscribe呼叫,然后一次使用ToTask()调用。这肯定会解释并发电话,这些呼叫发生在两个不同的订阅中。

编辑:

宣称长度如何(调整适合的超时):

var length = await observable.Count().Timeout(TimeSpan.FromSeconds(3));

最好是研究RX测试并模拟您的依赖性。这是一个很大的话题,但是RX团队的这篇长博客文章很好地解释了这一点,有关TPL-RX相互作用的答案可能会有所帮助:在反应性管道中执行TPL代码,并通过测试调度程序控制执行程序

相关内容

  • 没有找到相关文章

最新更新