反应式扩展(Rx)Switch()生成新的可观察项,该可观察项未订阅所提供的OnCompleted()



我的Rx订阅使用Switch语句时出现问题。

_performSearchSubject
            .AsObservable()
            .Select(_ => PerformQuery())
            .Switch()
            .ObserveOn(_synchronizationContextService.SynchronizationContext)
            .Subscribe(DataArrivedForPositions, PositionQueryError, PositionQueryCompleted)
            .DisposeWith(this);

流程为:

  1. 某些属性发生更改,性能SearchSubject.ONext被调用
  2. 调用PerformPositionQuery(),它在每次命中时返回一个观察者
  3. 当数据接收完成时,通过该观察器进行响应的服务调用OnNext两次,调用OnCompleted一次
  4. 方法DataArrivedForPositions的调用次数是预期的两次
  5. 方法PositionQueryCompleted从未被调用,尽管是observer。在我的数据服务中调用了OnCompleted()

数据服务的代码是:

        protected override void Request(Request request, IObserver<Response> observer)
        {
            query.Arrive += p => QueryReceive(request.RequestId, p, observer, query);
            query.Error += (type, s, message) => QueryError(observer, message);
            query.NoMoreData += id => QueryCompleted(observer);
            query.Execute(request);
        }
        private void QueryError(IObserver<PositionSheetResponse> observer, string message)
        {
            observer.OnError(new Exception(message));
        }
        private void QueryCompleted(IObserver<PositionSheetResponse> observer)
        {
            observer.OnCompleted();
        }
        private void QueryReceive(Guid requestId, Qry0079Receive receiveData, IObserver<PositionSheetResponse> observer, IQry0079PositionSheet query)
        {
            observer.OnNext(ConvertToResponse(requestId, receiveData));
        }

Switch结果只有在外部可观察对象(_performSearchSubject)完成时才会完成。我认为在您的情况下,这个永远不会(它可能与执行搜索的用户操作绑定)。

不清楚的是您期望何时调用PositionQueryCompleted。如果它是在处理完每个成功的查询之后,那么您的流就需要修改,因为Switch会丢失查询流完成的信息,但它也缺乏关于UI的信息(甚至是错误的调度程序)来说明其数据是否真的被处理过。

可能还有其他方法可以实现这一点,但基本上,您希望您的查询流完整地通过Switch(当前忽略此事件)生存。例如,您可以将查询流转换为n+1个事件,并为完整事件添加一个额外的事件:

    _performSearchSubject
        .AsObservable()
        .Select(_ => 
                  PerformQuery()
                  .Select(Data => new { Data, Complete = false})
                  .Concat(Observable.Return(new { Data = (string)null, Complete = true })))

你可以安全地在上面应用.Switch().ObserveOn(_synchronizationContextService.SynchronizationContext),但随后你需要修改你的订阅:

    .Subscribe(data => {
        if (data.Complete) DataArrivedForPositions(data.Data);
        else PositionQueryCompleted()
    }, PositionQueryError)

相关内容

  • 没有找到相关文章

最新更新