"来源";以及";溢流策略";是阿卡河的一部分。如何在FS2中重新实现以下代码?
object Topic {
def apply: Topic = {
val (a1, a2) = Source.queue[Message[_]](100, OverflowStrategy.backpressure).preMaterialize()
val (b1, b2) = Source.queue[Signal](100, OverflowStrategy.backpressure).preMaterialize()
new Topic(a1, a2, b1, b2)
}
}
在fs2中有几个构造函数,例如Queue.bounded[F, M](maxSize)
或Queue.circularBuffer[F, M](maxSize)
。对于背压,应使用Queue.bounded[F, M](maxSize)
。它的作用与akka队列的背压策略相同。
方法的完整列表:https://www.javadoc.io/doc/co.fs2/fs2-core_2.13/2.2.2/fs2/concurrent/Queue$.html
你会有这样的东西:
object Topic {
def apply[F]: F[Topic] = for {
a <- Queue.bounded[F, Message[_]](100)
b <- Queue.bounded[F, Signal](100)
} yield new Topic(a, b)
}
此外,您可以考虑使用FS2中的预构建主题类型。https://www.javadoc.io/doc/co.fs2/fs2-core_2.13/2.2.2/fs2/concurrent/Topic.html
Topic允许您将由任意数量的发布者发布的A分发给任意数量的订阅者。Topic具有内置的背压支持,实现为允许订阅者排队的最大绑定(maxQueued)。一旦该绑定被命中,发布可能会在语义上阻塞,直到滞后的订阅者消耗掉它的一些排队元素。