如何在Observable.Do扩展方法内部传递异步方法



给定:

  1. IObservable<T> src
  2. CCD_ 2
  3. CCD_ 3只能按顺序调用。所以await F(x);await F(y);是好的,但Task.Factory.ContinueWhenAll(new[]{F(x),F(y)}, _ => {...});是错误的,因为F(x)F(y)不能同时运行

我很清楚await src.Do(F)是错误的,因为它会同时运行F

我的问题是如何正确地做?

Observable.Do仅用于副作用,不用于连续用药。SelectMany可能是您想要的。从Rx 2.0开始,SelectMany的过载使得用Task<T>组成可观测值变得非常容易。(只需注意这些和类似的任务/可观察合作操作可能引入的额外并发性。)

var q = from value in src
        from _ in F(value.X).AsVoidAsync()  // See helper definition below
        from __ in F(value.Y).AsVoidAsync()
        select value;

但是,根据您特别询问Do运算符的事实,我怀疑src可能包含多个值,并且您也不希望对每个值重复调用F。在这种情况下,考虑SelectMany实际上和Select->Merge一样;因此,您可能想要的是Select->Concat

// using System.Reactive.Threading.Tasks
var q = src.Select(x => Observable.Defer(() => F(x).ToObservable())).Concat();

不要忘记使用Defer,因为F(x)热门

AsVoidSync扩展

async Task F(T){...}0需要T,而Task表示void,因此Rx的转换运算符要求我们从Task中获得Task<T>。我倾向于将Rx的System.Reactive.Unit结构用于T:

public static class TaskExtensions
{
  public static Task<Unit> AsVoidAsync(this Task task)
  {
    return task.ContinueWith(t =>
    {
      var tcs = new TaskCompletionSource<Unit>();
      if (t.IsCanceled)
      {
        tcs.SetCanceled();
      }
      else if (t.IsFaulted)
      {
        tcs.SetException(t.Exception);
      }
      else
      {
        tcs.SetResult(Unit.Default);
      }
      return tcs.Task;
    },
    TaskContinuationOptions.ExecuteSynchronously)
    .Unwrap();
  }
}

或者,您可以始终只调用专门的ToObservable方法。

最简单的方法是使用TPL数据流,默认情况下,它将限制为单个并发调用,即使对于异步方法也是如此:

var block = new ActionBlock<T>(F);
src.Subscribe(block.AsObserver());
await block.Completion;

或者,您可以订阅一个自己节流的异步方法:

var semaphore = new SemaphoreSlim(1);
src.Do(async x =>
{
  await semaphore.WaitAsync();
  try
  {
    await F(x);
  }
  finally
  {
    semaphore.Release();
  }
});

相关内容

  • 没有找到相关文章

最新更新