如何使用mapAsync在akka流中使用分组子流



我需要做一些类似的事情https://github.com/typesafehub/activator-akka-stream-scala/blob/master/src/main/scala/sample/stream/GroupLogFile.scala

我的问题是,我有一个未知数量的组,如果mapAsync的并行数少于我得到的组的数量和错误在最后一个sink

拆除山姆SynchronousFileSink(/用户//dev/项目/akka-streams/目标/log-ERROR.txt)由于上游错误(akka.stream.impl.StreamSubscriptionTimeoutSupport不久2美元美元)

我尝试在akka streams的模式指南中建议的中间放置一个缓冲区http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0/scala/stream-cookbook.html

groupBy {
  case LoglevelPattern(level) => level
  case other                  => "OTHER"
}.buffer(1000, OverflowStrategy.backpressure).
  // write lines of each group to a separate file
  mapAsync(parallelism = 2) {....

,但结果相同

扩展jrudolph的评论,这是完全正确的…

在本实例中不需要mapAsync。作为一个基本示例,假设您有一个元组

的源
import akka.stream.scaladsl.{Source, Sink}
def data() = List(("foo", 1),
                  ("foo", 2),
                  ("bar", 1),
                  ("foo", 3),
                  ("bar", 2))
val originalSource = Source(data)

您可以执行groupBy来创建一个Source of Sources

def getID(tuple : (String, Int)) = tuple._1
//a Source of (String, Source[(String, Int),_])
val groupedSource = originalSource groupBy getID

每个分组源都可以用map并行处理,不需要任何花哨的东西。下面是在一个独立流中对每个分组求和的示例:

import akka.actor.ActorSystem
import akka.stream.ACtorMaterializer
implicit val actorSystem = ActorSystem()
implicit val mat = ActorMaterializer()
import actorSystem.dispatcher
def getValues(tuple : (String, Int)) = tuple._2
//does not have to be a def, we can re-use the same sink over-and-over
val sumSink = Sink.fold[Int,Int](0)(_ + _)
//a Source of (String, Future[Int])
val sumSource  = 
  groupedSource map { case (id, src) => 
    id -> {src map getValues runWith sumSink} //calculate sum in independent stream
  }

现在所有的"foo"数与所有的"bar"数并行求和。

mapAsync是使用当你有一个封装的函数,返回一个Future[T],你试图发出一个T代替;你的问题不是这样的。此外,mapAsync涉及等待结果,这不是响应式的…

相关内容

  • 没有找到相关文章