如何使用反应式限制消耗序列



我们有一个应用程序,其中我们有一个物化的项目数组,我们将通过反应式管道处理这些项目。看起来有点像这样

EventLoopScheduler eventLoop = new EventLoopScheduler();
IScheduler concurrency = new TaskPoolScheduler(
    new TaskFactory(
        new LimitedConcurrencyLevelTaskScheduler(threadCount)));
IEnumerable<int> numbers = Enumerable.Range(1, itemCount);
// 1. transform on single thread
IConnectableObservable<byte[]> source = 
    numbers.Select(Transform).ToObservable(eventLoop).Publish();
// 2. naive parallelization, restricts parallelization to Work 
// only; chunk up sequence into smaller sequences and process
// in parallel, merging results
IObservable<int> final = source.
    Buffer(10).
    Select(
        batch =>
        batch.
        ToObservable(concurrency).
        Buffer(10).
        Select(
            concurrentBatch =>
            concurrentBatch.
            Select(Work).
            ToArray().
            ToObservable(eventLoop)).
        Merge()).
    Merge();
final.Subscribe();
source.Connect();
Await(final).Wait();

如果你真的很想玩这个,替身方法看起来像

private async static Task Await(IObservable<int> final)
{
    await final.LastOrDefaultAsync();
}
private static byte[] Transform(int number)
{
    if (number == itemCount)
    {
        Console.WriteLine("numbers exhausted.");
    }
    byte[] buffer = new byte[1000000];
    Buffer.BlockCopy(bloat, 0, buffer, 0, bloat.Length);
    return buffer;
}
private static int Work(byte[] buffer)
{
    Console.WriteLine("t {0}.", Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(50);
    return 1;
}

一点解释。 Range(1, itemCount)模拟从数据源具体化的原始输入。 Transform模拟每个输入必须经历的扩充过程,并导致更大的内存占用。 Work是一个"漫长"的过程,它对转换后的输入进行操作。

理想情况下,我们希望最大限度地减少系统同时持有的转换输入的数量,同时通过并行化Work来最大化吞吐量。内存中转换的输入数应为批大小(10以上(乘以并发工作线程数(threadCount(。

因此,对于 5 个线程,我们应该在任何给定时间保留 50 个Transform项目;如果像这里一样,转换是一个 1MB 字节的缓冲区,那么我们预计整个运行过程中的内存消耗约为 50MB。

我发现的完全不同。也就是说,反应式急切地消耗所有numbers,并提前Transform它们(如numbers exhausted.消息所示(,导致前期出现大量内存峰值(@1GB 1000 itemCount(。

我的基本问题是:有没有办法实现我需要的(即最小化消耗,由多线程批处理限制(?

更新:对不起,反转詹姆斯; 起初,我不认为Paulpdaniels和Enigmativity的组成Work(Transform)应用(这与我们实际实现的性质有关,它比上面提供的简单场景更复杂(,但是,经过一些进一步的实验,我也许能够应用相同的原则:即推迟转换直到批处理执行。

你的代码犯了几个错误,抛弃了你所有的结论。

首先,您已经完成了此操作:

IEnumerable<int> numbers = Enumerable.Range(1, itemCount);

您已经使用了Enumerable.Range这意味着当您调用numbers.Select(Transform)时,您将以单个线程可以承受的速度烧毁所有numbers。Rx 甚至没有机会做任何工作,因为到目前为止,您的管道是完全可枚举的。

下一个问题是在您的订阅中:

final.Subscribe();
source.Connect();
Await(final).Wait();

因为您调用final.Subscribe() & Await(final).Wait();所以您正在为可观察final创建两个单独的订阅。

由于中间有一个source.Connect(),因此第二个订阅可能会错过值。

所以,让我们试着去掉这里发生的所有问题,看看我们是否可以解决问题。

如果你归结为这个:

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .Select(bs => Work(bs));

事情进展顺利。数字在最后就用尽了,在我的机器上处理 20 个项目大约需要 1 秒。

但这是按顺序处理所有内容。Work步骤为Transform提供了背压,以减慢其消耗数字的速度。

让我们添加并发性。

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .SelectMany(bs => Observable.Start(() => Work(bs)));

这会在 0.284 秒内处理 20 个项目,并且在处理 5 个项目后数字会耗尽。数字上不再有任何背压。基本上,调度程序将所有工作交给Observable.Start因此可以立即为下一个数字做好准备。

让我们减少并发性。

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .Select(n => Transform(n))
        .SelectMany(bs => Observable.Start(() => Work(bs), concurrency));

现在,20 个项目在 0.5 秒内得到处理。在数字耗尽之前,只有两个得到处理。这是有道理的,因为我们将并发性限制为两个线程。但是,这些数字的消耗仍然没有背压,所以它们很快就会被咀嚼。

说了这么多,我试图用适当的背压构造一个查询,但我找不到方法。关键在于Transform(...)的执行速度比Work(...)快得多,因此完成速度要快得多。

所以对我来说显而易见的举动是这样的:

IObservable<int> final =
    Observable
        .Range(1, itemCount)
        .SelectMany(n => Observable.Start(() => Work(Transform(n)), concurrency));

这直到最后才完成数字,并且它将处理限制为两个线程。它似乎为你想要的事情做了正确的事情,除了我不得不一起做Work(Transform(...))

你想要限制你正在做的工作量这一事实表明你应该提取数据,而不是把它强加给你。在这种情况下,我会忘记使用 Rx,因为从根本上说,您描述的不是反应式应用程序。此外,Rx 最适合连续处理项目;它使用顺序事件流。

为什么不只保持数据源可枚举,并使用PLinq,Parallel.ForEach或DataFlow?所有这些听起来都更适合您的问题。

正如@JamesWorld所说,很可能你想使用 PLinq 来执行这个任务,这实际上取决于你是真的对真实场景中的数据做出反应,还是只是在迭代它。

如果选择使用响应式路由,则可以使用 Merge 来控制发生的并行化级别:

var source = numbers
  .Select(n => 
          Observable.Defer(() => Observable.Start(() => Work(Transform(n)), concurrency)))
  //Maximum concurrency
  .Merge(10)
  //Schedule all the output back onto the event loop scheduler
  .ObserveOn(eventLoop);

上面的代码将首先消耗所有数字(抱歉,没有办法避免这种情况(,但是,通过将处理包装在Defer中并随后使用限制并行化的Merge,一次只能运行x数量的项目。 Start()将调度程序作为第二个参数,用于执行到提供的方法。最后,由于您基本上只是将Transform的值推入Work因此我在Start方法中编写了它们。

作为旁注,您可以await一个Observable,它将等效于您拥有的代码,即:

await source; //== await source.LastAsync();

相关内容

  • 没有找到相关文章

最新更新