如何将 Rx.Nex 扩展用于具有异步操作的 EachAsync



我有代码将数据从SQL流式传输并将其写入不同的存储。代码大致如下:

using (var cmd = new SqlCommand("select * from MyTable", connection))
{
using (var reader = await cmd.ExecuteReaderAsync())
{
var list = new List<MyData>();
while (await reader.ReadAsync())
{
var row = GetRow(reader);
list.Add(row);
if (list.Count == BatchSize)
{
await WriteDataAsync(list);
list.Clear();
}
}
if (list.Count > 0)
{
await WriteDataAsync(list);
}
}
}

我想为此目的使用反应式扩展。理想情况下,代码如下所示:

await StreamDataFromSql()
.Buffer(BatchSize)
.ForEachAsync(async batch => await WriteDataAsync(batch));

但是,似乎扩展方法 ForEachAsync 只接受同步操作。是否可以编写一个接受异步操作的扩展?

是否可以编写一个接受异步操作的扩展?

不直接。

Rx 订阅必须是同步的,因为 Rx 是基于推送的系统。当数据项到达时,它会遍历您的查询,直到到达最终订阅 - 在本例中是执行Action

Rx 提供的await-able 方法await序列本身- 即,ForEachAsync序列方面是异步的(您正在异步等待序列完成(,但ForEachAsync中的订阅(为每个元素执行的操作(仍然必须是同步的。

若要在数据管道中执行同步到异步转换,需要有一个缓冲区。Rx 订阅可以(同步(作为生成者添加到缓冲区,而异步使用者正在检索项目并处理它们。因此,您需要一个同时支持同步和异步操作的生产者/使用者队列。

TPL 数据流中的各种块类型可以满足此需求。这样的事情应该就足够了:

var obs = StreamDataFromSql().Buffer(BatchSize);
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
using (var subscription = obs.Subscribe(buffer.AsObserver()))
await buffer.Completion;

请注意,没有背压;只要StreamDataFromSql可以推送数据,它就会被缓冲并存储在ActionBlock的传入队列中。根据数据的大小和类型,这可能会快速使用大量内存。

正确的做法是正确使用反应式扩展来完成此操作 - 因此请从创建连接开始,直到写入数据。

方法如下:

IObservable<IList<MyData>> query =
Observable
.Using(() => new SqlConnection(""), connection =>
Observable
.Using(() => new SqlCommand("select * from MyTable", connection), cmd =>
Observable
.Using(() => cmd.ExecuteReader(), reader =>
Observable
.While(() => reader.Read(), Observable.Return(GetRow(reader))))))
.Buffer(BatchSize);
IDisposable subscription =
query
.Subscribe(async list => await WriteDataAsync(list));

我无法测试代码,但它应该可以工作。此代码假定WriteDataAsync也可以采用IList<MyData>。如果它不只是掉在.ToList()

.

下面是支持异步操作的ForEachAsync方法的一个版本。它将可观察的源投影到包含异步操作的嵌套IObservable<IObservable<Unit>>,然后使用Merge运算符将其平展回IObservable<Unit>。生成的可观察量最终转换为任务。

默认情况下,操作按顺序调用,但可以通过配置可选的maximumConcurrency参数并发调用它们。

取消可选的cancellationToken参数会导致返回的Task立即完成(取消(,可能在取消当前正在运行的操作之前。

可能发生的任何异常都会通过Task传播,并导致取消所有当前正在运行的操作。

/// <summary>
/// Invokes an asynchronous action for each element in the observable sequence,
/// and returns a 'Task' that represents the completion of the sequence and
/// all the asynchronous actions.
/// </summary>
public static Task ForEachAsync<TSource>(
this IObservable<TSource> source,
Func<TSource, CancellationToken, Task> action,
CancellationToken cancellationToken = default,
int maximumConcurrency = 1)
{
// Arguments validation omitted
return source
.Select(item => Observable.FromAsync(ct => action(item, ct)))
.Merge(maximumConcurrency)
.DefaultIfEmpty()
.ToTask(cancellationToken);
}

使用示例:

await StreamDataFromSql()
.Buffer(BatchSize)
.ForEachAsync(async (batch, token) => await WriteDataAsync(batch, token));

以下是 ForEachAsync 的源代码和一篇关于 ToEnumerable 和 AsObservable 方法的文章

我们可以围绕 ForEachAsync 做一个包装器,等待一个任务返回函数:

public static async Task ForEachAsync<T>( this IObservable<T> t, Func<T, Task> onNext )
{
foreach ( var x in t.ToEnumerable() )
await onNext( x );
}

用法示例:

await ForEachAsync( Observable.Range(0, 10), async x => await Task.FromResult( x ) );

最新更新