如果与 async/await 一起使用,返回 IEnumerable 会发生什么情况(使用 Dapper 从 SQL



我正在使用Dapper从SQL Server中非常大的集合流式传输数据。它与返回IEnumerable并调用Query()一起工作,但是当我切换到QueryAsync()时,似乎该程序会尝试从SQL Server读取所有数据而不是流式传输。

根据这个问题,它应该可以正常工作buffered: false,我正在这样做,但这个问题没有说明async/await.

现在根据这个问题,用QueryAsync()做我想做的事并不简单.

我是否正确理解在上下文切换时迭代枚举async/await

另一个问题,当新的 C#8 异步流可用时,是否可以执行此操作?

2020 年 3 月更新

.NET Core 3.0(和3.1)现已推出,完全支持异步流。Microsoft.Bcl.AsyncInterfaces在.NET Standard 2.0和.NET Framework 4.6.1+中添加了对它们的支持,尽管出于健全的原因应使用4.7.2。正如 .NET 标准实现支持上的文档所解释的那样

虽然 NuGet 认为 .NET Framework 4.6.1 支持 .NET

Standard 1.5 到 2.0,但在使用 .NET Framework 4.6.1 项目中为这些版本构建的 .NET Standard 库时存在几个问题。

对于需要使用此类库的 .NET Framework 项目,我们建议您将项目升级到面向 .NET Framework 4.7.2 或更高版本。

原始答案

如果你检查源代码,你会发现你的怀疑几乎是正确的。当buffered为 false 时,QueryAsync同步流式传输。

if (command.Buffered)
{
var buffer = new List<T>();
var convertToType = Nullable.GetUnderlyingType(effectiveType) ?? effectiveType;
while (await reader.ReadAsync(cancel).ConfigureAwait(false))
{
object val = func(reader);
if (val == null || val is T)
{
buffer.Add((T)val);
}
else
{
buffer.Add((T)Convert.ChangeType(val, convertToType, CultureInfo.InvariantCulture));
}
}
while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
command.OnCompleted();
return buffer;
}
else
{
// can't use ReadAsync / cancellation; but this will have to do
wasClosed = false; // don't close if handing back an open reader; rely on the command-behavior
var deferred = ExecuteReaderSync<T>(reader, func, command.Parameters);
reader = null; // to prevent it being disposed before the caller gets to see it
return deferred;
}

正如注释所解释的那样,当返回类型预期为 IEnumerable 时,无法使用ReadAsync。这就是必须引入 C# 8 的异步枚举的原因。

ExecuteReaderSync 的代码是:

private static IEnumerable<T> ExecuteReaderSync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
using (reader)
{
while (reader.Read())
{
yield return (T)func(reader);
}
while (reader.NextResult()) { /* ignore subsequent result sets */ }
(parameters as IParameterCallbacks)?.OnCompleted();
}
}

它使用Read而不是ReadAsync

C#8 异步流将允许重写此流以返回IAsyncEnumerable.简单地更改语言版本并不能解决问题。

鉴于异步流上的当前文档,这可能如下所示:

private static async IAsyncEnumerable<T> ExecuteReaderASync<T>(IDataReader reader, Func<IDataReader, object> func, object parameters)
{
using (reader)
{
while (await reader.ReadAsync())
{
yield return (T)func(reader);
}
while (await reader.NextResultAsync(cancel).ConfigureAwait(false)) { /* ignore subsequent result sets */ }
command.OnCompleted();
(parameters as IParameterCallbacks)?.OnCompleted();
}
}

Buuuuuut异步流是只能在 .NET Core 上运行的东西之一,可能尚未实现。当我试图用 Sharplab.io 写一个时,卡布姆。[connection lost, reconnecting…]

特别是在dapper的上下文中,是的:它需要一个不同的API,正如@Panagiotis的出色答案所解释的那样。接下来的不是这样的答案,而是面临相同挑战的实现者可能希望考虑的其他上下文。

我还没有为dapper"加标"这个(尽管我SE。Redis),我在各种选项之间左右为难:

  1. 仅为.NET Core 添加新的 API,返回适当的异步枚举类型
  2. 完全粉碎现有 API 作为重大更改("主要"等),将其更改为返回异步枚举类型

我们可能会选择"1",但我不得不说,第二个选项异常诱人,这是有充分理由的:

  • 现有的 API 可能无法完成人们期望它做的事情
  • 我们希望新代码开始使用它

但奇怪的是IAsyncEnumerable<T>的.NET Core 3.0 - 因为显然Dapper不仅仅针对.NET Core 3.0;我们可以:

  1. 将该功能限制为 .NET Core 3.0,并返回IAsyncEnumerable<T>
  2. 限制为 .NET Core 3.0,并返回IAsyncEnumerable<T>
  3. 依赖于
  4. 以前的框架的System.Linq.Async(它不是"官方的",但对于我们的目的来说已经足够官方了),并返回IAsyncEnumerable<T>
  5. 返回一个实际上不是IAsyncEnumerable<T>的自定义可枚举类型(但在可用时实现IAsyncEnumerable<T>),并手动实现状态机 -foreach的鸭子类型性质意味着只要我们的自定义可枚举类型提供正确的方法,这将正常工作

我想我们可能会选择选项3,但要重申:是的,有些事情需要改变。

(这应该是一个评论//到目前为止,声誉还不够)

Marc Gravell 在他的回复中提到IAsyncEnumerable<T>会更好,但由于依赖于 NET Core 3.0,最好依赖System.Linq.Async(这可以被认为是"足够官方")......

在这种情况下,我想到了 https://github.com/Dasync/AsyncEnumerable(MIT许可证): 它旨在帮助

。(a) 创建一个元素提供程序,由于依赖于其他异步事件(例如等待句柄、网络流),生成元素可能需要大量时间,以及 (b) 一个使用者,该使用者在元素准备就绪后立即处理这些元素而不会阻塞线程(处理改为在工作线程上调度)。

还有一个问题,RE:"发布 C# 8.0 时会发生什么?(常见问题)

C# 8.0 应具有异步流功能。当语言的版本最终实现时,它应该是应用程序的直接升级路径。

最新更新