用响应式扩展处理一批请求



我正在学习响应式扩展,我一直在试图找出它是否适合这样的任务。

我有一个Process()方法,它将一批请求作为一个工作单元处理,并在所有请求完成时调用回调。

重要的是,每个请求将调用回调或同步或异步取决于它的实现,批处理处理器必须能够处理这两个。

但是没有线程从批处理程序启动,任何新的线程(或其他异步执行)将在必要时从请求处理程序内部启动。我不知道这是否符合rx的用例。

我当前的工作代码看起来(几乎)是这样的:

public void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted)
{
    IUnitOfWork uow = null;
    try
    {
        uow = unitOfWorkFactory.Create();
        var responses = new List<IResponse>();
        var outstandingRequests = requests.Count;
        foreach (var request in requests)
        {
            var correlationId = request.CorrelationId;
            Action<IResponse> requestCallback = response =>
            {
                response.CorrelationId = correlationId;
                responses.Add(response);
                outstandingRequests--;
                if (outstandingRequests != 0)
                    return;
                uow.Commit();
                onCompleted(responses);
            };
            requestProcessor.Process(request, requestCallback);
        }
    }
    catch(Exception)
    {
        if (uow != null) 
            uow.Rollback();
    }
    if (uow != null) 
        uow.Commit();
}        

如何使用rx实现这个?这合理吗?

请注意,即使存在尚未返回的异步请求,也要同步提交工作单元。

我的方法是两步走。

首先创建一个通用运算符,将Action<T, Action<R>>变为Func<T, IObservable<R>>:

public static class ObservableEx
{
    public static Func<T, IObservable<R>> FromAsyncCallbackPattern<T, R>(
        this Action<T, Action<R>> call)
    {
        if (call == null) throw new ArgumentNullException("call");
        return t =>
        {
            var subject = new AsyncSubject<R>();
            try
            {
                Action<R> callback = r =>
                {
                    subject.OnNext(r);
                    subject.OnCompleted();
                };
                call(t, callback);
            }
            catch (Exception ex)
            {
                return Observable.Throw<R>(ex, Scheduler.ThreadPool);
            }
            return subject.AsObservable<R>();
        };
    }
}

接下来,将呼叫void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted)变为IObservable<IResponse> Process(IObservable<IRequest> requests):

public IObservable<IResponse> Process(IObservable<IRequest> requests)
{
    Func<IRequest, IObservable<IResponse>> rq2rp =
        ObservableEx.FromAsyncCallbackPattern
            <IRequest, IResponse>(requestProcessor.Process);
    var query = (
        from rq in requests
        select rq2rp(rq)).Concat();
    var uow = unitOfWorkFactory.Create();
    var subject = new ReplaySubject<IResponse>();
    query.Subscribe(
        r => subject.OnNext(r),
        ex =>
        {
            uow.Rollback();
            subject.OnError(ex);
        },
        () =>
        {
            uow.Commit();
            subject.OnCompleted();
        });
    return subject.AsObservable();
}

现在,这不仅异步运行处理,而且还确保了结果的正确顺序。

实际上,由于您从一个集合开始,您甚至可以这样做:

var rqs = requests.ToObservable();
var rqrps = rqs.Zip(Process(rqs),
    (rq, rp) => new
    {
        Request = rq,
        Response = rp,
    });

那么你就会有一个可观察对象来配对每个请求/响应,而不需要CorrelationId属性。

这是Rx的一部分天赋,因为您可以自由地以同步或异步方式返回结果:

public IObservable<int> AddNumbers(int a, int b) {
    return Observable.Return(a + b);  
}
public IObservable<int> AddNumbersAsync(int a, int b) {
    return Observable.Start(() => a + b, Scheduler.NewThread);
}

它们都有IObservable类型,所以它们的工作方式是一样的。如果你想知道所有的IObservables是什么时候完成的,Aggregate会这样做,因为它会把Observable中的n个item变成1个 item,并在最后返回:

IObservable<int> listOfObservables[];
listObservables.ToObservable()
    .Merge()
    .Aggregate(0, (acc, x) => acc+1)
    .Subscribe(x => Console.WriteLine("{0} items were run", x));

最新更新