Akka到Scala Cats FS2的迁移



"来源";以及";溢流策略";是阿卡河的一部分。如何在FS2中重新实现以下代码?

object Topic {
def apply: Topic = {
val (a1, a2) = Source.queue[Message[_]](100, OverflowStrategy.backpressure).preMaterialize()
val (b1, b2) = Source.queue[Signal](100, OverflowStrategy.backpressure).preMaterialize()
new Topic(a1, a2, b1, b2)
}
}

在fs2中有几个构造函数,例如Queue.bounded[F, M](maxSize)Queue.circularBuffer[F, M](maxSize)。对于背压,应使用Queue.bounded[F, M](maxSize)。它的作用与akka队列的背压策略相同。

方法的完整列表:https://www.javadoc.io/doc/co.fs2/fs2-core_2.13/2.2.2/fs2/concurrent/Queue$.html

你会有这样的东西:

object Topic {
def apply[F]: F[Topic] = for {
a <- Queue.bounded[F, Message[_]](100)
b <- Queue.bounded[F, Signal](100)
} yield new Topic(a, b)
}

此外,您可以考虑使用FS2中的预构建主题类型。https://www.javadoc.io/doc/co.fs2/fs2-core_2.13/2.2.2/fs2/concurrent/Topic.html

Topic允许您将由任意数量的发布者发布的A分发给任意数量的订阅者。Topic具有内置的背压支持,实现为允许订阅者排队的最大绑定(maxQueued)。一旦该绑定被命中,发布可能会在语义上阻塞,直到滞后的订阅者消耗掉它的一些排队元素。

相关内容

  • 没有找到相关文章

最新更新