Akka溪流-结合最新的操作



我想像这里描述的那样将-latest与Akka Streams结合起来。

我不知道该怎么做,请帮帮我!

谢谢,瑞安。

我只是快速地实现了它。不确定它是否没有bug,但值得一试:)https://gist.github.com/tg44/2e75d45c234ca02d91cfdac35f41a5a2欢迎在要点下留言!

正如我们在gitter通道上所说的那样,它无法通过构建阶段实现,但您可以使用自定义阶段编写功能。您将需要两个输入和一个输出(可以扩展到N个输入),所以它的形状是风扇。

我将传入的元素保存到选项中,当一个输入准备好时(即。发送一个元素)我将给定的元素保存到选项中。每当输出需要一个元素时(我们已经从两个输入中获得了一个元素),我将选项中的值作为元组提供给它。这是反压感知方法。

对于背压方法(您生成所有对),您需要处理等待"其他"输出元素,然后是最后一个输出元素,并且需要处理输入拉。我认为我的实现仍然不能处理太快的生产者和缓慢的消费者情况(我们可以错过一个元素,可以用emit处理),如果两个输入多次产生相同的元素,可能会死锁(也许emit也可以处理这个)。

如果你想扩展我的代码功能或想写其他自定义阶段读这个:http://doc.akka.io/docs/akka/2.5/scala/stream/stream-customize.html

相关内容

  • 没有找到相关文章

最新更新