我的Rx订阅使用Switch语句时出现问题。
_performSearchSubject
.AsObservable()
.Select(_ => PerformQuery())
.Switch()
.ObserveOn(_synchronizationContextService.SynchronizationContext)
.Subscribe(DataArrivedForPositions, PositionQueryError, PositionQueryCompleted)
.DisposeWith(this);
流程为:
- 某些属性发生更改,性能SearchSubject.ONext被调用
- 调用PerformPositionQuery(),它在每次命中时返回一个观察者
- 当数据接收完成时,通过该观察器进行响应的服务调用OnNext两次,调用OnCompleted一次
- 方法DataArrivedForPositions的调用次数是预期的两次
- 方法PositionQueryCompleted从未被调用,尽管是observer。在我的数据服务中调用了OnCompleted()
数据服务的代码是:
protected override void Request(Request request, IObserver<Response> observer)
{
query.Arrive += p => QueryReceive(request.RequestId, p, observer, query);
query.Error += (type, s, message) => QueryError(observer, message);
query.NoMoreData += id => QueryCompleted(observer);
query.Execute(request);
}
private void QueryError(IObserver<PositionSheetResponse> observer, string message)
{
observer.OnError(new Exception(message));
}
private void QueryCompleted(IObserver<PositionSheetResponse> observer)
{
observer.OnCompleted();
}
private void QueryReceive(Guid requestId, Qry0079Receive receiveData, IObserver<PositionSheetResponse> observer, IQry0079PositionSheet query)
{
observer.OnNext(ConvertToResponse(requestId, receiveData));
}
Switch
结果只有在外部可观察对象(_performSearchSubject
)完成时才会完成。我认为在您的情况下,这个永远不会(它可能与执行搜索的用户操作绑定)。
不清楚的是您期望何时调用PositionQueryCompleted
。如果它是在处理完每个成功的查询之后,那么您的流就需要修改,因为Switch
会丢失查询流完成的信息,但它也缺乏关于UI的信息(甚至是错误的调度程序)来说明其数据是否真的被处理过。
可能还有其他方法可以实现这一点,但基本上,您希望您的查询流完整地通过Switch
(当前忽略此事件)生存。例如,您可以将查询流转换为n+1个事件,并为完整事件添加一个额外的事件:
_performSearchSubject
.AsObservable()
.Select(_ =>
PerformQuery()
.Select(Data => new { Data, Complete = false})
.Concat(Observable.Return(new { Data = (string)null, Complete = true })))
你可以安全地在上面应用.Switch().ObserveOn(_synchronizationContextService.SynchronizationContext)
,但随后你需要修改你的订阅:
.Subscribe(data => {
if (data.Complete) DataArrivedForPositions(data.Data);
else PositionQueryCompleted()
}, PositionQueryError)