我有以下2个Async方法使用Action回调
public interface IGeoCoordinateService
{
void Start(Action<GeoPosition<GeoCoordinate>> positionCallback, Action<Exception> exceptionCallback);
}
public interface IGooglePlacesService
{
void FindLocations(GeoCoordinate geoCoordinate, SearchFilter searchFilter, Action<PlaceSearchResponse> searchCallback);
}
- 我必须确保在我想执行GooglePlacesService之前我有一个来自GeoCoordinateService的结果。
- 我还想执行GooglePlacesService每次GeoCoordinate变化(而不是null)
用Rx做这个最简单的方法是什么?
最简单的方法是首先将接口转换为IObservables。为此,我将对您的API做一些假设:
- 一旦启动,IGeoCoordinateService可能会多次调用positionCallback。
- 如果IGeoCoordinateService调用了exceptionCallback,它就完成了,不会再调用positionCallback。(这个假设需要匹配IObservable合约。)
- IGooglePlacesService只调用一次searchCallback。(这个假设是有用的SelectMany我将稍后做。)
从那里,你可以写这些函数:
public IObservable<GeoPosition<GeoCoordinate>> ToObservable(this IGeoCoordinateService service)
{
return Observable.Create((IObserver<GeoPosition<GeoCoordinate>> observer) =>
{
service.Start(observer.OnNext, observer.OnError);
//nothing to do on unsubscribe, cannot cancel run
return (() => {});
})
}
public Func<GeoCoordinate, SearchFilter, IObservable<PlaceSearchResponse>>
ToObservable(IGooglePlacesService service)
{
return (coord, filter) =>
{
return Observable.Create((IObserver<PlaceSearchResponse> observer) =>
{
service.FindLocations(coord, filter,
(value) =>
{
observer.OnNext(value);
observer.OnCompleted();
});
//nothing to do on unsubscribe, cannot cancel run
return (() => {});
})
}
}
现在你可以将服务转换为IObservables,你可以像这样链接它们(希望你能想出一个更好的函数名):
public IObservable<PlaceSearchResponse> ChainServices(IGeoCoordinateService geo, IGooglePlacesService place, SearchFilter filter)
{
return from pos in geo.ToObservable()
where pos != null && pos.Coordinate != null
from placeResponsen place.ToObservable()(pos.Coordinate, filter)
select placeResponse;
}
请注意,和通常的IObservables一样,调用ChainServices不做任何事情,你必须在返回的IObservable上调用Subscribe才能真正进行异步调用。
一些API可以使用的东西,如果你有任何控制:
- 通过某种方式取消调用(IObservable通过从subscribe返回IDisposable来实现这一点)
- 某种方法来告诉IGeoCoordinateService何时完成,以便能够在观察者上调用OnCompleted(假设它多次调用positionCallback)