给定:
IObservable<T> src
- CCD_ 2
- CCD_ 3只能按顺序调用。所以
await F(x);await F(y);
是好的,但Task.Factory.ContinueWhenAll(new[]{F(x),F(y)}, _ => {...});
是错误的,因为F(x)
和F(y)
不能同时运行
我很清楚await src.Do(F)
是错误的,因为它会同时运行F
。
我的问题是如何正确地做?
Observable.Do
仅用于副作用,不用于连续用药。SelectMany
可能是您想要的。从Rx 2.0开始,SelectMany
的过载使得用Task<T>
组成可观测值变得非常容易。(只需注意这些和类似的任务/可观察合作操作可能引入的额外并发性。)
var q = from value in src
from _ in F(value.X).AsVoidAsync() // See helper definition below
from __ in F(value.Y).AsVoidAsync()
select value;
但是,根据您特别询问Do
运算符的事实,我怀疑src可能包含多个值,并且您也不希望对每个值重复调用F。在这种情况下,考虑SelectMany
实际上和Select->Merge
一样;因此,您可能想要的是Select->Concat
。
// using System.Reactive.Threading.Tasks
var q = src.Select(x => Observable.Defer(() => F(x).ToObservable())).Concat();
不要忘记使用Defer
,因为F(x)
是热门。
AsVoidSync扩展:
async Task F(T){...}
0需要T,而Task
表示void,因此Rx的转换运算符要求我们从Task
中获得Task<T>
。我倾向于将Rx的System.Reactive.Unit结构用于T
:
public static class TaskExtensions
{
public static Task<Unit> AsVoidAsync(this Task task)
{
return task.ContinueWith(t =>
{
var tcs = new TaskCompletionSource<Unit>();
if (t.IsCanceled)
{
tcs.SetCanceled();
}
else if (t.IsFaulted)
{
tcs.SetException(t.Exception);
}
else
{
tcs.SetResult(Unit.Default);
}
return tcs.Task;
},
TaskContinuationOptions.ExecuteSynchronously)
.Unwrap();
}
}
或者,您可以始终只调用专门的ToObservable方法。
最简单的方法是使用TPL数据流,默认情况下,它将限制为单个并发调用,即使对于异步方法也是如此:
var block = new ActionBlock<T>(F);
src.Subscribe(block.AsObserver());
await block.Completion;
或者,您可以订阅一个自己节流的异步方法:
var semaphore = new SemaphoreSlim(1);
src.Do(async x =>
{
await semaphore.WaitAsync();
try
{
await F(x);
}
finally
{
semaphore.Release();
}
});