我在一个上下文中使用Akka流,其中单个源的接收器会来来去去。出于这个原因,我从一个来源创建了一个发布者,并在需要时附加订阅者:
val publisher= mySource.runWith(Sink.publisher(true))
带有
publisher.subscribe(subscriber1)// There will be others
一些订阅服务器将比其他订阅服务器更快,我希望允许更快的订阅服务器独立于最慢的订阅服务器,至少在发布服务器的输入缓冲区允许的范围内进行扩展。此缓冲区由Sink.publisher(true)方法上的注释描述:
如果
fanout
是true
,则物化的Publisher
将支持多个Subscriber
,并且为该阶段配置的inputBuffer
的大小成为最快的[[org.reactivestreams.Subscriber]]在由于背压而减慢处理之前可以领先于最慢的元素的最大元素数。
我的问题是,我不知道如何为"这个阶段"设置这个inputBuffer值。我看到的最接近的描述在本文的Dropping Broadcast部分,但这似乎坚持使用Flow DSL。我相信我不能使用DSL,因为我需要不断地连接新的订阅服务器。
因此,我的整体流速率被最慢的订户所阻碍。我尝试做的一个相关方面是确保不同的订阅者在不同的线程上运行(而不创建显式参与者作为订阅者)。
它看起来像(对于Akka Streams 2.0.1):
Sink.asPublisher(true).addAttributes(Attributes.inputBuffer(initialSize, maxSize))