使用 Akka Streams, Scala 异步读取多个文件



我想读很多.CSV 文件异步返回自定义事例类的可迭代对象。

我可以通过 Akka Streams 和 How. 来实现这一点吗?

*我试图根据文档以某种方式平衡工作,但通过...

改用Actor是一种好的做法吗?(一个有子项的父Actor,每个子项读取一个文件,并返回一个可迭代对象给父级,然后父级将所有可迭代对象组合在一起?

与@paul答案基本相同,但略有改进

def files = new java.io.File("").listFiles().map(_.getAbsolutePath).to[scala.collection.immutable.Iterable]
Source(files).flatMapConcat( filename => //you could use flatMapMerge if you don't bother about line ordering
FileIO.fromPath(Paths.get(filename))
.via(Framing.delimiter(ByteString("n"), 256, allowTruncation = true).map(_.utf8String))
).map { csvLine =>
// parse csv here
println(csvLine)
}

首先,您需要阅读/了解Akka流的工作原理,包括Source,Flow和Sink。然后,您可以开始学习运算符。

若要并行执行多个操作,可以使用运算符mapAsync在其中指定并行度数。

/**
* Using mapAsync operator, we pass a function which return a Future, the number of parallel run futures will
* be determine by the argument passed to the operator.
*/
@Test def readAsync(): Unit = {
Source(0 to 10)//-->Your files
.mapAsync(5) { value => //-> It will run in parallel 5 reads
implicit val ec: ExecutionContext = ActorSystem().dispatcher
Future {
//Here read your file
Thread.sleep(500)
println(s"Process in Thread:${Thread.currentThread().getName}")
value
}
}
.runWith(Sink.foreach(value => println(s"Item emitted:$value in Thread:${Thread.currentThread().getName}")))
}

您可以在此处了解有关 akka 和 akka 流的更多信息 https://github.com/politrons/Akka

最新更新