尝试将现有数据访问代码转换为异步代码并遇到 Rx,因为您无法在方法主体中返回带有yield return
的Task<IEnumerable<T>>
。
我写了这个,但不确定它的异步,所以指针感激地收到
public class EmployeeRepository : IEmployeeRepository
{
public IAsyncEnumerable<Employee> GetEmployees()
{
return Enumerable().ToAsyncEnumerable();
}
private IEnumerable<Employee> Enumerable()
{
using (var connection = new SqlConnection(ConfigurationManager.ConnectionStrings["DBConnString"].ConnectionString))
{
connection.Open();
using (var command = new SqlCommand(@"SELECT * FROM EMPLOYEES", connection))
{
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
yield return
new Employee()
{
Id = ReadField<int>(reader, "Id"),
Name = ReadField<string>(reader, "Name")
};
}
}
}
}
}
private static T ReadField<T>(IDataRecord reader, string fieldName)
{
var value = reader[fieldName];
return value == DBNull.Value ? default(T) : (T)value;
}
}
异步的。 ToAsyncEnumerable
创建一个简单的适配器,该适配器阻止每次调用MoveNext
。返回这样的异步适配器是一种不好的做法,与执行Task.Run(() => BlockingMethod())
相同。它向用户隐藏了实现效率低下的问题,如果他们知道它的存在,他们可能已经能够以更好的方式解决。
IAsyncEnumerable
没有语言集成的 yield 特性,但它是可以模拟的。我有代码来做到这一点,但公平警告这会产生一些开销:
IAsyncEnumerable<Employee> async = AsyncEnumerableEx.Create<Employee>(
async (y, cancellationToken) =>
{
using (var connection = new SqlConnection(ConfigurationManager
.ConnectionStrings["DBConnString"].ConnectionString))
{
await connection.OpenAsync(cancellationToken);
using (var command = new SqlCommand(@"SELECT * FROM EMPLOYEES",
connection))
{
using (var reader = await
command.ExecuteReaderAsync(cancellationToken))
{
while (await reader.ReadAsync(cancellationToken))
{
await y.YieldReturn(new Employee()
{
Id = ReadField<int>(reader, "Id"),
Name = ReadField<string>(reader, "Name")
});
}
}
}
}
});
如果你想使用实际的Rx,它内置了一个几乎相同的Observable.Create
实用程序。由于削减了一些等待开销,它将更有效率。
IObservable<Employee> async = Observable.Create<Employee>(
async (obs, cancellationToken) =>
{
using (var connection = new SqlConnection(ConfigurationManager
.ConnectionStrings["DBConnString"].ConnectionString))
{
await connection.OpenAsync(cancellationToken);
using (var command = new SqlCommand(@"SELECT * FROM EMPLOYEES",
connection))
{
using (var reader = await
command.ExecuteReaderAsync(cancellationToken))
{
while (await reader.ReadAsync(cancellationToken))
{
obs.OnNext(new Employee()
{
Id = ReadField<int>(reader, "Id"),
Name = ReadField<string>(reader, "Name")
});
}
}
}
}
});
如果你想使用 Rx,试试这样的事情:
public IObservable<Employee> GetEmployees()
{
return Observable.Create<Employee>(o =>
Observable.Using(() => new SqlConnection(ConfigurationManager
.ConnectionStrings["DBConnString"].ConnectionString),
connection =>
Observable.Using(() =>
{
connection.Open();
return new SqlCommand(
@"SELECT * FROM EMPLOYEES", connection);
},
command =>
Observable.Using(() => command.ExecuteReader(),
reader =>
Observable.Generate(
0,
x => reader.Read(),
x => x,
x => new Employee()
{
Id = ReadField<int>(reader, "Id"),
Name = ReadField<string>(reader, "Name")
}, Scheduler.Default)))).Subscribe(o));
}