菜鸟问题 - 监视返回 Action <T>的 2 个异步方法的结果



我有以下2个Async方法使用Action回调

public interface IGeoCoordinateService
{
    void Start(Action<GeoPosition<GeoCoordinate>> positionCallback, Action<Exception> exceptionCallback);
}
public interface IGooglePlacesService
{
    void FindLocations(GeoCoordinate geoCoordinate, SearchFilter searchFilter, Action<PlaceSearchResponse> searchCallback);
}
  • 我必须确保在我想执行GooglePlacesService之前我有一个来自GeoCoordinateService的结果。
  • 我还想执行GooglePlacesService每次GeoCoordinate变化(而不是null)

用Rx做这个最简单的方法是什么?

最简单的方法是首先将接口转换为IObservables。为此,我将对您的API做一些假设:

  • 一旦启动,IGeoCoordinateService可能会多次调用positionCallback。
  • 如果IGeoCoordinateService调用了exceptionCallback,它就完成了,不会再调用positionCallback。(这个假设需要匹配IObservable合约。)
  • IGooglePlacesService只调用一次searchCallback。(这个假设是有用的SelectMany我将稍后做。)

从那里,你可以写这些函数:

public IObservable<GeoPosition<GeoCoordinate>> ToObservable(this IGeoCoordinateService service)
{
    return Observable.Create((IObserver<GeoPosition<GeoCoordinate>> observer) =>
                             {
                                 service.Start(observer.OnNext, observer.OnError);
                                 //nothing to do on unsubscribe, cannot cancel run
                                 return (() => {});
                             })
}
public Func<GeoCoordinate, SearchFilter, IObservable<PlaceSearchResponse>>
ToObservable(IGooglePlacesService service)
{
    return (coord, filter) =>
           {
               return Observable.Create((IObserver<PlaceSearchResponse> observer) =>
                                        {
                                            service.FindLocations(coord, filter,
                                                                  (value) =>
                                                                  {
                                                                      observer.OnNext(value);
                                                                      observer.OnCompleted();
                                                                  });
                                            //nothing to do on unsubscribe, cannot cancel run
                                            return (() => {});
                                        })
           }
}

现在你可以将服务转换为IObservables,你可以像这样链接它们(希望你能想出一个更好的函数名):

public IObservable<PlaceSearchResponse> ChainServices(IGeoCoordinateService geo, IGooglePlacesService place, SearchFilter filter)
{
    return from pos in geo.ToObservable()
           where pos != null && pos.Coordinate != null
           from placeResponsen place.ToObservable()(pos.Coordinate, filter)
           select placeResponse;
}

请注意,和通常的IObservables一样,调用ChainServices不做任何事情,你必须在返回的IObservable上调用Subscribe才能真正进行异步调用。

一些API可以使用的东西,如果你有任何控制:

  • 通过某种方式取消调用(IObservable通过从subscribe返回IDisposable来实现这一点)
  • 某种方法来告诉IGeoCoordinateService何时完成,以便能够在观察者上调用OnCompleted(假设它多次调用positionCallback)

相关内容

  • 没有找到相关文章