如何使用 SqlDataReader 返回和使用 IAsyncEnumerable



请看下面两种方法。 第一个返回一个IAsyncEnumerable。 第二个试图消耗它。

using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
public static class SqlUtility
{
public static async IAsyncEnumerable<IDataRecord> GetRecordsAsync(
string connectionString, SqlParameter[] parameters, string commandText,
[EnumeratorCancellation]CancellationToken cancellationToken)
{
using (SqlConnection connection = new SqlConnection(connectionString))
{
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);
using (SqlCommand command = new SqlCommand(commandText, connection))
{
command.Parameters.AddRange(parameters);
using (var reader = await command.ExecuteReaderAsync()
.ConfigureAwait(false))
{
while (await reader.ReadAsync().ConfigureAwait(false))
{
yield return reader;
}
}
}
}
}
public static async Task Example()
{
const string connectionString =
"Server=localhost;Database=[Redacted];Integrated Security=true";
SqlParameter[] parameters = new SqlParameter[]
{
new SqlParameter("VideoID", SqlDbType.Int) { Value = 1000 }
};
const string commandText = "select * from Video where VideoID=@VideoID";
IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString,
parameters, commandText, CancellationToken.None);
IDataRecord firstRecord = await records.FirstAsync().ConfigureAwait(false);
object videoID = firstRecord["VideoID"]; //Should be 1000.
// Instead, I get this exception:
// "Invalid attempt to call MetaData when reader is closed."
}
}

当代码尝试读取生成的IDataReader(在object videoID = firstRecord["VideoID"];时),我得到以下异常:

关闭读取器时调用元数据的尝试无效。

这是因为SqlDataReader被处置了。 有人可以提供以异步方式枚举SqlDataReader的推荐方法,以便每个结果记录都可用于调用方法吗? 谢谢。

在此方案中,LINQ 不是您的朋友,因为FirstAsync将在迭代器返回结果之前关闭迭代器,这不是 ADO.NET 期望的;基本上:不要在这里使用 LINQ,或者至少不要这样。您可以使用类似Select的东西在序列仍处于打开状态时执行投影,或者将此处的所有工作卸载到 Dapper 等工具可能更容易。或者,要手动执行此操作:

await foreach (var record in records)
{
// TODO: process record
// (perhaps "break"), because you only want the first
}

您可以通过不返回依赖于连接仍处于打开状态的对象来避免这种情况。例如,如果您只需要VideoID,则只需返回该(我假设它是一个int):

public static async IAsyncEnumerable<int> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
...
yield return reader["VideoID"];
...
}

或者投影到你自己的类中:

public class MyRecord {
public int VideoId { get; set; }
}
public static async IAsyncEnumerable<MyRecord> GetRecordsAsync(string connectionString, SqlParameter[] parameters, string commandText, [EnumeratorCancellation]CancellationToken cancellationToken)
{
...
yield return new MyRecord {
VideoId = reader["VideoID"]
}
...
}

或者按照 Marc 的建议进行操作,并在第一个之后使用foreachbreak,在您的情况下如下所示:

IAsyncEnumerable<IDataRecord> records = GetRecordsAsync(connectionString, parameters, commandText, CancellationToken.None);
object videoID;
await foreach (var record in records)
{
videoID = record["VideoID"];
break;
}

当你公开一个开放的DataReader时,关闭它和底层Connection的响应现在属于调用者,所以你不应该释放任何东西。相反,您应该使用接受CommandBehavior参数的DbCommand.ExecuteReaderAsync重载,并传递CommandBehavior.CloseConnection值:

执行命令时,当关联的 DataReader 对象关闭时,关联的连接对象将关闭。

然后你可以只希望调用者按照规则玩,及时调用DataReader.Close方法,并且在对象被垃圾回收之前不会让连接打开。因此,公开开放DataReader应被视为一种极端的性能优化技术,应谨慎使用。

顺便说一句,如果您返回IEnumerable<IDataRecord>而不是IAsyncEnumerable<IDataRecord>,您会遇到同样的问题。

为了添加到其他答案中,您可以将实用程序方法设为通用,并添加一个投影委托 ,Func<IDataRecord, T> projection,作为参数,如下所示:

public static async IAsyncEnumerable<T> GetRecordsAsync<T>(
string connectionString, SqlParameter[] parameters, string commandText,
Func<IDataRecord, T> projection, // Parameter here
[EnumeratorCancellation] CancellationToken cancellationToken)
{
...
yield return projection(reader); // Projected here
...
}

然后在调用时传入 lambda 或引用如下方法组:

public static object GetVideoId(IDataRecord dataRecord)
=> dataRecord["VideoID"];

这样:

GetRecordsAsync(connectionString, parameters, commandText, GetVideoId, CancellationToken.None);

在 2021 年底,我遇到了这个确切的问题。我找不到一个完整的示例,所以我只是弄乱了我能找到的东西,直到我找到一些东西来工作。

这是我的代码完成,虽然很简单(以便稍后可以扩展它),以及一些注释,详细介绍了我在此过程中进入的一些坑:

// This function turns each "DataRow" into an object of Type T and yields
// it. You could alternately yield the reader itself for each row.
// In this example, assume sqlCommandText and connectionString exist.
public async IAsyncEnumerable<T> ReadAsync<T>( Func<SqlDataReader, T> func )
{
// we need a connection that will last as long as the reader is open,
// alternately you could pass in an open connection.
using SqlConnection connection = new SqlConnection( connectionString );
using SqlCommand cmd = new SqlCommand( sqlCommandText, connection );
await connection.OpenAsync();
var reader = await cmd.ExecuteReaderAsync();
while( await reader.ReadAsync() )
{
yield return func( reader );
}
}

然后在(异步)代码的任何其他部分中,您可以在await foreach循环中调用函数:

private static async Task CallIAsyncEnumerable()
{
await foreach( var category in ReadAsync( ReaderToCategory ) )
{
// do something with your category; save it in a list, write it to disk,
// make an HTTP call ... the whole world is yours!
}
}
// an example delegate, which I'm passing into ReadAsync
private static Category ReaderToCategory( SqlDataReader reader )
{
return new Category()
{
Code = ( string )reader[ "Code" ],
Group = ( string )reader[ "Group" ]
};
}

我发现的其他几件事:你不能从tryyield,但你可以将所有内容(包括)cmd.ExecuteReaderAsync()塞进一个try,或者一个返回DataReader的单独方法。或者您可以将await foreach包裹在try块中;我认为问题在于屈服于尝试之外的呼叫者(这是有道理的,在你考虑之后)。

如果使用其他方法生成读取器,请将连接传递到该方法中,以便可以控制其生存期。如果您的方法创建连接,执行命令并返回SqlDataReader,则连接将关闭(如果使用了"using"),然后才能从读取器读取。同样,如果你考虑一下,这是完全有道理的,但它让我绊倒了几分钟。

祝你好运,我希望这对将来对其他人有所帮助!

我推荐这样的东西:

public async IAsyncEnumerable<Customer> GetByCity(string city)
{
const string sql = "SELECT * FROM Customers WHERE City = @City";

using var command = connection.CreateCommand();
command.CommandText = sql;
command.Parameters.AddWithValue("@City", city);

if (connection.State == ConnectionState.Closed)
await connection.OpenAsync();

using SqlDataReader reader = await command.ExecuteReaderAsync();

while (await reader.ReadAsync())
{
yield return Map(reader);
}
}

private static Customer Map(SqlDataReader reader) => new Customer
{
FirstName = (string) reader["FirstName"],
LastName  = (string) reader["LastName"]
}

await foreach (var customer in customerRepository.GetByCity("Warsaw"))
{
// ...
}

相关内容

  • 没有找到相关文章

最新更新