为什么我的Akka数据流停止处理一个巨大的文件(〜250,000行字符串),但适用于小文件



我的流适用于1000行的较小文件,但是当我在大文件〜12MB和〜250,000行上进行测试时停止?我尝试用缓冲区施加背压并将其限制并仍然相同...

这是我的数据流:

class UserDataStreaming(usersFile: File) {
  implicit val system = ActorSystemContainer.getInstance().getSystem
  implicit val materializer = ActorSystemContainer.getInstance().getMaterializer
  def startStreaming() = {
    val graph = RunnableGraph.fromGraph(GraphDSL.create() {
      implicit builder =>
      val usersSource = builder.add(Source.fromIterator(() => usersDataLines)).out
      val stringToUserFlowShape: FlowShape[String, User] = builder.add(csvToUser)
      val averageAgeFlowShape: FlowShape[User, (String, Int, Int)] = builder.add(averageUserAgeFlow)
      val averageAgeSink = builder.add(Sink.foreach(averageUserAgeSink)).in
      usersSource ~> stringToUserFlowShape ~> averageAgeFlowShape ~> averageAgeSink
      ClosedShape
    })
    graph.run()
  }
  val usersDataLines = scala.io.Source.fromFile(usersFile, "ISO-8859-1").getLines().drop(1)
  val csvToUser = Flow[String].map(_.split(";").map(_.trim)).map(csvLinesArrayToUser)
  def csvLinesArrayToUser(line: Array[String]) = User(line(0), line(1), line(2))
  def averageUserAgeSink[usersSource](source: usersSource) {
    source match {
      case (age: String, count: Int, totalAge: Int) => println(s"age = $age; Average reader age is: ${Try(totalAge/count).getOrElse(0)} count = $count and total age = $totalAge")
      case bad => println(s"Bad case: $bad")
    }
  }
  def averageUserAgeFlow = Flow[User].fold(("", 0, 0)) {
    (nums: (String, Int, Int), user: User) =>
      var counter: Option[Int] = None
      var totalAge: Option[Int] = None
      val ageInt = Try(user.age.substring(1, user.age.length-1).toInt)
      if (ageInt.isSuccess) {
        counter = Some(nums._2 + 1)
        totalAge = Some(nums._3 + ageInt.get)
      }
      else {
        counter = Some(nums._2 + 0)
        totalAge = Some(nums._3 + 0)
      }
      //println(counter.get)
      (user.age, counter.get, totalAge.get)
  }
}

这是我的主要:

object Main {
def main(args: Array[String]): Unit = {
  implicit val system = ActorSystemContainer.getInstance().getSystem
  implicit val materializer = ActorSystemContainer.getInstance().getMaterializer
  val usersFile = new File("data/BX-Users.csv")
  println(usersFile.length())
  val userDataStreamer = new UserDataStreaming(usersFile)
  userDataStreamer.startStreaming()
}

可能会出现与CSV文件的一行有关的任何错误。在这种情况下,流构成并停止。尝试定义这样的流量:

FlowFlowShape[String, User].map {
  case (user) => try {
    csvToUser(user)
  } 
}.withAttributes(ActorAttributes.supervisionStrategy {
  case ex: Throwable =>
    log.error("Error parsing row event: {}", ex)
    Supervision.Resume
}

在这种情况下,可能会捕获可能的例外,并忽略该错误并继续。

如果使用sustion.stop,则流停止。

最新更新