Akka 流,源项目作为另一个源



我正在使用Alpakka-FTP,但也许我正在寻找一个通用的akka-stream模式。FTP 连接器可以列出文件或检索它们:

def ls(host: String): Source[FtpFile, NotUsed]
def fromPath(host: String, path: Path): Source[ByteString, Future[IOResult]]

理想情况下,我想创建一个这样的流:

LIST
  .FETCH_ITEM
  .FOREACH(do something)

但是我无法使用上面写的两个函数创建这样的流。我觉得我应该能够使用Flow到达那里,比如

Ftp.ls
  .via(some flow that uses the Ftp.fromPath above)
  .runWith(Sink.foreach(do something))

仅给定上述lsfromPath函数,这可能吗?

编辑:

我能够用一个演员和mapAsync来解决,但我仍然觉得它应该更直接。

class Downloader extends Actor {
  override def receive = {
    case ftpFile: FtpFile =>
      Ftp.fromPath(Paths.get(ftpFile.path), settings)
        .toMat(FileIO.toPath(Paths.get("testHDF.txt")))(Keep.right)
        .run() pipeTo sender
  }
}
val downloader = as.actorOf(Props(new Downloader))
Ftp.ls("test_path", settings)
  .mapAsync(1)(ftpFile => (downloader ? ftpFile) (3.seconds).mapTo[IOResult])
  .runWith(Sink.foreach(res => println("got it!" + res)))

您应该能够为此目的使用flatMapConcat。您的特定示例可以重写为

Ftp.ls("test_path", settings).flatMapConcat{ ftpFile =>
  Ftp.fromPath(Paths.get(ftpFile.path), settings)
}.runWith(FileIO.toPath(Paths.get("testHDF.txt")))

文档在这里。

最新更新