如何设置akka流发布服务器的inputBuffer



我在一个上下文中使用Akka流,其中单个源的接收器会来来去去。出于这个原因,我从一个来源创建了一个发布者,并在需要时附加订阅者:

val publisher= mySource.runWith(Sink.publisher(true))

带有

publisher.subscribe(subscriber1)// There will be others

一些订阅服务器将比其他订阅服务器更快,我希望允许更快的订阅服务器独立于最慢的订阅服务器,至少在发布服务器的输入缓冲区允许的范围内进行扩展。此缓冲区由Sink.publisher(true)方法上的注释描述:

如果fanouttrue,则物化的Publisher将支持多个Subscriber,并且为该阶段配置的inputBuffer的大小成为最快的[[org.reactivestreams.Subscriber]]在由于背压而减慢处理之前可以领先于最慢的元素的最大元素数。

我的问题是,我不知道如何为"这个阶段"设置这个inputBuffer值。我看到的最接近的描述在本文的Dropping Broadcast部分,但这似乎坚持使用Flow DSL。我相信我不能使用DSL,因为我需要不断地连接新的订阅服务器。

因此,我的整体流速率被最慢的订户所阻碍。我尝试做的一个相关方面是确保不同的订阅者在不同的线程上运行(而不创建显式参与者作为订阅者)。

它看起来像(对于Akka Streams 2.0.1):

Sink.asPublisher(true).addAttributes(Attributes.inputBuffer(initialSize, maxSize))

相关内容

  • 没有找到相关文章

最新更新