将"IObservable<T>"转换为"IEnumerable<Task<T>>"?



我有几个使用回调或事件的异步 API 而不是async。我成功地使用TaskCompletionSource来包装它们,如此处所述。

现在,我想使用一个返回IObservable<T>并生成多个对象的 API。我读到了关于 .NET 的 Rx,这似乎是要走的路。但是,我对包含另一个依赖项和另一个新范例犹豫不决,因为我已经在这个应用程序中使用了很多对我来说新的东西(如 XAML、MVVM、C# 的异步/等待(。

有没有办法包装IObservable<T>类似于包装单个回调 API 的方式?我想这样调用 API:

foreach (var t in GetMultipleInstancesAsync()) {
var res = await t;
Console.WriteLine("Received item:", res);
}

如果可观察量发出多个值,则可以将它们转换为Task<T>,然后将它们添加到任何IEnumerable结构中。

检查 IObservable ToTask。如此处所述,在等待之前必须完成可观察量,否则可能会有更多的值出现。

本指南在这里也可以为您解决问题

public static Task<IList<T>> BufferAllAsync<T>(this IObservable<T> observable)
{
List<T> result = new List<T>();
object gate = new object();
TaskCompletionSource<IList<T>> finalTask = new TaskCompletionSource<IList<T>>();
observable.Subscribe(
value =>
{
lock (gate)
{
result.Add(value);
}
},
exception => finalTask.TrySetException(exception),
() => finalTask.SetResult(result.AsReadOnly())
);
return finalTask.Task;
}

如果你想使用 Rx,那么你可以使用你的返回列表:

GetMultipleInstancesAsync().ToObservable().Subscribe(...);

您可以订阅 OnComplete/OnError 处理程序。

您也可以将其包装为任务列表:

var result = await Task.WhenAll(GetMultipleInstancesAsync().ToArray());

所以你得到了一系列的结果,你就完成了。

相关内容

  • 没有找到相关文章

最新更新