根据请求提供数据,延迟加载(相当于收益回报)



我有一个从数据库加载的数据记录流。我无法将它们全部存储并加载到内存中,因为它们有数百万个。调用者应该一个接一个地处理记录(当然我不能保证)。

我的第一次尝试是返回IEnumerable<Records>的惰性序列,该序列将根据需要加载并由yield return语句返回。

但是我不能在这个方法中使用await/async(用于从数据库中获取数据),因为yield return需要返回类型IEnumerable<>。因此,我无法使用asyncTask<IEnumerable<>>

读到这篇文章后,我决定尝试Reactive Extensions,因为我可以等待异步方法并返回IObservable<>

但据我所知,一旦有人订阅了我的observable,就会调用提取数据的方法,它会一次提取所有数据。

这就是我的方法的一部分,我的方法看起来像:

IList<int> ids = (...);
return Observable.Create<NitemonkeyRegistration>(async obs => 
    {
        using (SqlDataReader reader = await command.ExecuteReaderAsync())
            {
                if (!reader.HasRows)
                    obs.OnCompleted();
                while (await reader.ReadAsync())
                    ids.Add(reader.GetInt32(reader.GetOrdinal("RegistrationId")));
                for (int i = 0; i < ids.Count; i += 1000)
                {
                    //heavy database operations
                    var registrations = await GetRegistrationsByIds(connection, ids.Skip(i).Take(1000));
                    foreach (var pulledReg in registrations)
                    {
                        obs.OnNext(pulledReg);
                    }
                }
            }
     });

我可以让调用者控制吗?这样,当他在可观察对象上调用.Next()时,我的代码就会按需提取数据

如何使用反应式扩展实现类似于yield return的东西

更新

这是我的消费者代码:

var cancellationTokenSource = new CancellationTokenSource();
await Observable.ForEachAsync<NitemonkeyRegistration>(niteMonkeySales, async (record, i) =>
            {
                try
                {
                     await SomethingAwaitableWhichCanTakeSeconds(record);
                }
                catch(Exception e)
                {
                    // add logging
                    // this cancels the loop but also the IObservable
                    cancellationTokenSource.Cancel();
                    // can't rethrow because line
                    // above will cause errored http response already created
                }
            }, cancellationTokenSource.Token);

这样做的问题是,推送新记录时没有等待不可执行的任务完成。我可以使用.Wait()而不是异步lambda来完成此操作,但线程将浪费在等待漫长的网络操作完成上。

可能很重要:这是一个ASP.NET WEB API服务

Rx允许描述"推送序列",其中生产者将值推送给观察者。如果您的需求是从源代码中"提取"值,我认为您正在寻找的是交互式扩展异步库(请查看此Channel 9视频)。它定义了IAsyncEnumerable<T>类型和一整套LINQ运算符,这允许用异步行为描述基于拉的序列(但缺点是yield return不适用于该类型(至少),因此您可能需要编写自己的IAsyncEnumerator<T>实现)。

Rx.NET目前没有很多内置的背压运算符。

使用类似TPL数据流的东西可能更适合您的问题。

无论如何,我认为你可以使用BlockingCollection来限制你从数据库中提取的速率:

// maximum of 10 items in buffer
var buffer = new BlockingCollection<NitemonkeyRegistration>(10);
niteMonkeySales.Subscribe(t => buffer.Add(t), () => buffer.CompleteAdd());
foreach (var item in buffer.GetConsumingEnumerable())
{
    try
    {
         await SomethingAwaitableWhichCanTakeSeconds(record);
    }
    catch(Exception e)
    {
        // add logging
        // this cancels the loop but also the IObservable
        cancellationTokenSource.Cancel();
        // can't rethrow because line
        // above will cause errored http response already created
    }
}

您需要使用反应式扩展吗?

你的第一次尝试可能是在正确的轨道上。

看看另一个问题的答案。

问题可能是查询,而不是客户端代码。

如链接问题中所述,您可能需要重写查询,以确保它将数据正确地流式传输到客户端。


更新:

你应该试着把GetRegistrationsById分成两块。

  1. 获取SqlDataReader以运行查询。你可以await这个部分
  2. 使用返回的SqlDataReader,并使用yield return对其进行迭代

下面是一个示例,它松散地基于您的代码示例。

    IList<int> ids = new List<int>();
    private async void doWork()
    {
        var connection = new SqlConnection(...);
        connection.Open();
        SqlCommand command = new SqlCommand("SELECT registrationId FROM someTable", connection);
        using (SqlDataReader reader = await command.ExecuteReaderAsync())
        {
            while (await reader.ReadAsync())
            {
                ids.Add(reader.GetInt32(reader.GetOrdinal("RegistrationId")));
            }
            reader.Close();
            //heavy database operations
            // Part 1 of whatever GetRegistrationsByIds does would go into GetRegistrationReader().
            var registrationReader = await Task.Run(() => GetRegistrationReader(connection, ids));
            // Part 2 of whatever GetRegistrationsByIds does for each 
            // Registration would go into GetRegistrations().
            var registrationEnumerator = GetRegistrations(orderReader);
            foreach (var registration in registrationEnumerator)
            {
                // Do whatever you need to do for each registration
                listBox1.Items.Add(registration.Id);
            }
        }
    }
    private IEnumerable<Registration> GetRegistrations(SqlDataReader reader)
    {
        while (reader.Read())
        {
            // You would do whatever you need to do to each registration here.
            var registration = new Registration{ Id = reader.GetInt32(reader.GetOrdinal("RegistrationId")) };
            yield return registration;
        }
    }
    private SqlDataReader GetRegistrationReader(SqlConnection connection, IList<int> ints)
    {
        // Some query that returns a lot of rows.
        // Ideally it would written to stream directly from the 
        // database server, rather than buffer the data to the client
        // side.
        SqlCommand command = new SqlCommand("SELECT * from registrations", connection);
        return command.ExecuteReader();
    }
    internal class Registration
    {
        public int Id;
        // ... other fields, etc.
    }

最新更新