我有一小段代码,它模拟了使用大型对象(即巨大的byte[]
)的流。对于序列中的每个项,都会调用一个异步方法来获得一些结果。问题是什么?实际上,它抛出OutOfMemoryException
。
与LINQPad(C#程序)兼容的代码:
void Main()
{
var selectMany = Enumerable.Range(1, 100)
.Select(i => new LargeObject(i))
.ToObservable()
.SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)));
selectMany
.Subscribe(r => Console.WriteLine(r));
}
private static async Task<int> DoSomethingAsync(LargeObject lo)
{
await Task.Delay(10000);
return lo.Id;
}
internal class LargeObject
{
public int Id { get; }
public LargeObject(int id)
{
this.Id = id;
}
public byte[] Data { get; } = new byte[10000000];
}
似乎它同时创建了所有对象。我怎样才能用正确的方法呢?
其基本思想是调用DoSomethingAsync以便为每个对象获得一些结果,所以这就是我使用SelectMany的原因。为了简化,我只介绍了一个Task.Delay,但在现实生活中,它是一个可以同时处理一些项目的服务,所以我想介绍一些并发机制来利用它。
请注意,理论上,一次处理少量项目不应该填满内存。事实上,我们只需要每个"大对象"就可以获得DoSomethingAsync方法的结果。在该点之后,不再使用大对象
我觉得自己在重复自己。与您上一个问题和我上一个答案类似,您需要做的是限制bigObjects的数量™以并发创建。
要做到这一点,您需要将对象创建和处理结合起来,并将其放在同一个线程池中。现在的问题是,我们使用异步方法来允许线程在异步方法运行时做其他事情。由于您的慢速网络调用是异步的,因此您的(快速)对象创建代码将使创建大型对象的速度过快。
相反,我们可以使用Rx通过将对象创建与异步调用相结合来计算并发运行的Observable的数量,并使用.Merge(maxConcurrent)
来限制并发。
另外,我们还可以设置执行查询的最短时间。只需要Zip
,它只需要最小的延迟。
static void Main()
{
var selectMany = Enumerable.Range(1, 100)
.ToObservable()
.Select(i => Observable.Defer(() => Observable.Return(new LargeObject(i)))
.SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)))
.Zip(Observable.Timer(TimeSpan.FromMilliseconds(400)), (el, _) => el)
).Merge(4);
selectMany
.Subscribe(r => Console.WriteLine(r));
Console.ReadLine();
}
private static async Task<int> DoSomethingAsync(LargeObject lo)
{
await Task.Delay(10000);
return lo.Id;
}
internal class LargeObject
{
public int Id { get; }
public LargeObject(int id)
{
this.Id = id;
Console.WriteLine(id + "!");
}
public byte[] Data { get; } = new byte[10000000];
}
它似乎同时创建了所有对象。
是的,因为你正在同时创建它们。
如果我简化你的代码,我可以告诉你为什么:
void Main()
{
var selectMany =
Enumerable
.Range(1, 5)
.Do(x => Console.WriteLine($"{x}!"))
.ToObservable()
.SelectMany(i => Observable.FromAsync(() => DoSomethingAsync(i)));
selectMany
.Subscribe(r => Console.WriteLine(r));
}
private static async Task<int> DoSomethingAsync(int i)
{
await Task.Delay(1);
return i;
}
运行此操作将生成:
1.2.3.4.5.4.3.5.2.1.
由于Observable.FromAsync
,您允许源在返回任何结果之前运行到完成。换句话说,您正在快速构建所有大型对象,但处理它们的速度较慢。
您应该允许Rx同步运行,但在默认调度程序上运行,这样您的主线程就不会被阻塞。然后,代码将在没有任何内存问题的情况下运行,并且您的程序将在主线程上保持响应。
这是这个的代码:
var selectMany =
Observable
.Range(1, 100, Scheduler.Default)
.Select(i => new LargeObject(i))
.Select(o => DoSomethingAsync(o))
.Select(t => t.Result);
(我已经用Observable.Range(1, 100)
有效地替换了Enumerable.Range(1, 100).ToObservable()
,因为这也有助于解决一些问题。)
我尝试过测试其他选项,但到目前为止,任何允许DoSomethingAsync
异步运行的选项都会出现内存不足错误。
ConcatMap支持开箱即用。我知道这个操作符在.net中不可用,但您可以使用Concat操作符进行同样的操作,该操作符会推迟订阅每个内部源,直到上一个完成。
您可以通过以下方式引入时间间隔延迟:
var source = Enumerable.Range(1, 100)
.ToObservable()
.Zip(Observable.Interval(TimeSpan.FromSeconds(1)), (i, ts) => i)
.Select(i => new LargeObject(i))
.SelectMany(o => Observable.FromAsync(() => DoSomethingAsync(o)));
因此,它不是一次提取所有100个整数,立即将它们转换为LargeObject
,然后对所有100调用DoSomethingAsync
,而是将整数逐个滴出,每个间隔一秒。
这就是TPL+Rx解决方案的样子。不用说,它没有单独的Rx或单独的TPL那么优雅。然而,我认为这个问题不太适合Rx:
void Main()
{
var source = Observable.Range(1, 100);
const int MaxParallelism = 5;
var transformBlock = new TransformBlock<int, int>(async i => await DoSomethingAsync(new LargeObject(i)),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = MaxParallelism });
source.Subscribe(transformBlock.AsObserver());
var selectMany = transformBlock.AsObservable();
selectMany
.Subscribe(r => Console.WriteLine(r));
}