我有一个从数据库加载的数据记录流。我无法将它们全部存储并加载到内存中,因为它们有数百万个。调用者应该一个接一个地处理记录(当然我不能保证)。
我的第一次尝试是返回IEnumerable<Records>
的惰性序列,该序列将根据需要加载并由yield return
语句返回。
但是我不能在这个方法中使用await/async
(用于从数据库中获取数据),因为yield return
需要返回类型IEnumerable<>
。因此,我无法使用async
和Task<IEnumerable<>>
。
读到这篇文章后,我决定尝试Reactive Extensions,因为我可以等待异步方法并返回IObservable<>
。
但据我所知,一旦有人订阅了我的observable,就会调用提取数据的方法,它会一次提取所有数据。
这就是我的方法的一部分,我的方法看起来像:
IList<int> ids = (...);
return Observable.Create<NitemonkeyRegistration>(async obs =>
{
using (SqlDataReader reader = await command.ExecuteReaderAsync())
{
if (!reader.HasRows)
obs.OnCompleted();
while (await reader.ReadAsync())
ids.Add(reader.GetInt32(reader.GetOrdinal("RegistrationId")));
for (int i = 0; i < ids.Count; i += 1000)
{
//heavy database operations
var registrations = await GetRegistrationsByIds(connection, ids.Skip(i).Take(1000));
foreach (var pulledReg in registrations)
{
obs.OnNext(pulledReg);
}
}
}
});
我可以让调用者控制吗?这样,当他在可观察对象上调用.Next()
时,我的代码就会按需提取数据
如何使用反应式扩展实现类似于yield return
的东西
更新
这是我的消费者代码:
var cancellationTokenSource = new CancellationTokenSource();
await Observable.ForEachAsync<NitemonkeyRegistration>(niteMonkeySales, async (record, i) =>
{
try
{
await SomethingAwaitableWhichCanTakeSeconds(record);
}
catch(Exception e)
{
// add logging
// this cancels the loop but also the IObservable
cancellationTokenSource.Cancel();
// can't rethrow because line
// above will cause errored http response already created
}
}, cancellationTokenSource.Token);
这样做的问题是,推送新记录时没有等待不可执行的任务完成。我可以使用.Wait()而不是异步lambda来完成此操作,但线程将浪费在等待漫长的网络操作完成上。
可能很重要:这是一个ASP.NET WEB API服务
Rx允许描述"推送序列",其中生产者将值推送给观察者。如果您的需求是从源代码中"提取"值,我认为您正在寻找的是交互式扩展异步库(请查看此Channel 9视频)。它定义了IAsyncEnumerable<T>
类型和一整套LINQ运算符,这允许用异步行为描述基于拉的序列(但缺点是yield return不适用于该类型(至少),因此您可能需要编写自己的IAsyncEnumerator<T>
实现)。
Rx.NET目前没有很多内置的背压运算符。
使用类似TPL数据流的东西可能更适合您的问题。
无论如何,我认为你可以使用BlockingCollection
来限制你从数据库中提取的速率:
// maximum of 10 items in buffer
var buffer = new BlockingCollection<NitemonkeyRegistration>(10);
niteMonkeySales.Subscribe(t => buffer.Add(t), () => buffer.CompleteAdd());
foreach (var item in buffer.GetConsumingEnumerable())
{
try
{
await SomethingAwaitableWhichCanTakeSeconds(record);
}
catch(Exception e)
{
// add logging
// this cancels the loop but also the IObservable
cancellationTokenSource.Cancel();
// can't rethrow because line
// above will cause errored http response already created
}
}
您需要使用反应式扩展吗?
你的第一次尝试可能是在正确的轨道上。
看看另一个问题的答案。
问题可能是查询,而不是客户端代码。
如链接问题中所述,您可能需要重写查询,以确保它将数据正确地流式传输到客户端。
更新:
你应该试着把GetRegistrationsById
分成两块。
- 获取
SqlDataReader
以运行查询。你可以await
这个部分 - 使用返回的
SqlDataReader
,并使用yield return
对其进行迭代
下面是一个示例,它松散地基于您的代码示例。
IList<int> ids = new List<int>();
private async void doWork()
{
var connection = new SqlConnection(...);
connection.Open();
SqlCommand command = new SqlCommand("SELECT registrationId FROM someTable", connection);
using (SqlDataReader reader = await command.ExecuteReaderAsync())
{
while (await reader.ReadAsync())
{
ids.Add(reader.GetInt32(reader.GetOrdinal("RegistrationId")));
}
reader.Close();
//heavy database operations
// Part 1 of whatever GetRegistrationsByIds does would go into GetRegistrationReader().
var registrationReader = await Task.Run(() => GetRegistrationReader(connection, ids));
// Part 2 of whatever GetRegistrationsByIds does for each
// Registration would go into GetRegistrations().
var registrationEnumerator = GetRegistrations(orderReader);
foreach (var registration in registrationEnumerator)
{
// Do whatever you need to do for each registration
listBox1.Items.Add(registration.Id);
}
}
}
private IEnumerable<Registration> GetRegistrations(SqlDataReader reader)
{
while (reader.Read())
{
// You would do whatever you need to do to each registration here.
var registration = new Registration{ Id = reader.GetInt32(reader.GetOrdinal("RegistrationId")) };
yield return registration;
}
}
private SqlDataReader GetRegistrationReader(SqlConnection connection, IList<int> ints)
{
// Some query that returns a lot of rows.
// Ideally it would written to stream directly from the
// database server, rather than buffer the data to the client
// side.
SqlCommand command = new SqlCommand("SELECT * from registrations", connection);
return command.ExecuteReader();
}
internal class Registration
{
public int Id;
// ... other fields, etc.
}