我有一个Akka服务器,他正在向演员询问木槌文件(一些输出(。但是,在木槌演员代码中,要执行几个步骤。在其中获取,修改文件,创建新文件并将其保存在资源目录中几次。
我需要按顺序运行 actor,所以我在调用未来时使用 map。但是,该作业正在获取NullPointerException,因为没有下一个将来的文件。一旦我停止服务器。所有文件都在资源目录中生成。
一旦个人未来完成,我需要资源目录中的文件。请建议
下面是我的服务器的代码
lazy val routes: Route = apiRoutes
Configuration.parser.parse(args,Configuration.ConfigurationOptions()) match {
case Some(config) =>
val serverBinding: Future[Http.ServerBinding] = Http().bindAndHandle(routes, config.interface, config.port)
serverBinding.onComplete {
case Success(bound) =>
println(s"Server online at http://${bound.localAddress.getHostString}:${bound.localAddress.getPort}/")
case Failure(e) =>
log.error("Server could not start: ", e)
system.terminate()
}
case None =>
system.terminate()
}
Await.result(system.whenTerminated, Duration.Inf)
}
下面是接收函数的代码。
def receive: Receive = {
case GetMalletOutput(malletFile) => createMalletResult(malletFile).pipeTo(sender())
}
def createMalletResult(malletFile: String): Future[MalletModel] = {
//sample malletResult
val topics = Array(Topic("1", "2").toJson)
var mM: Future[MalletModel] = Future{MalletModel("contentID", topics)}
//first Future to save file in resource
def saveFile(malletFile: String): Future[String] = Future {
val res = MalletResult(malletFile)
val converted = res.Score.parseJson.convertTo[MalletRepo]
val fileName = converted.ContentId
val fileTemp = new File("src/main/resources/new_corpus/" + fileName)
val output = new BufferedWriter(new FileWriter("src/main/resources/new_corpus/" + fileName))
output.write(converted.ContentText)
//output.close()
malletFile
}
//Second Future to used the resource file and create new one
def t2v(malletFile: String): Future[String] = Future{
val tmpDir = "src/main/resources/"
logger.debug(tmpDir.toString)
logger.debug("t2v Started")
Text2Vectors.main(("--input " + tmpDir + "new_corpus/ --keep-sequence --remove-stopwords " + "--output " + tmpDir + "new_corpus.mallet --use-pipe-from " + tmpDir + "corpus.mallet").split(" "))
logger.debug("text2Vector Completed")
malletFile
}
//another future to take file from resource and save in the new file back in resource
def infer(malletFile: String): Future[String] = Future {
val tmpDir = "src/main/resources/"
val tmpDirNew = "src/main/resources/inferResult/"
logger.debug("infer started")
InferTopics.main(("--input " + tmpDir + "new_corpus.mallet --inferencer " + tmpDir + "inferencer " + "--output-doc-topics " + tmpDirNew + "doc-topics-new.txt --num-iterations 1000").split(" "))
logger.debug("infer Completed")
malletFile
}
//final future to return the requested output using the saved future
def response(malletFile: String): Future[MalletModel] = Future{
logger.debug("response Started")
val lines = Source.fromResource("src/main/resources/inferResult/doc-topics-new.txt")
.getLines
.toList
.drop(1) match {
case Nil => List.empty
case x :: xs => x.split(" ").drop(2).mkString(" ") :: xs
}
logger.debug("response On")
val result = MalletResult(malletFile)
val convert = result.Score.parseJson.convertTo[MalletRepo]
val contentID = convert.ContentId
val inFile = lines.mkString(" ")
val a = inFile.split(" ").zipWithIndex.collect { case (v, i) if (i % 2 == 0) =>
(v, i)
}.map(_._1)
val b = inFile.split(" ").zipWithIndex.collect { case (v, i) if (i % 2 != 0) =>
(v, i)
}.map(_._1)
val paired = a.zip(b) // [(s,t),(s,t)]
val topics = paired.map(x => Topic(x._2, x._1).toJson)
logger.debug("validating")
logger.debug("mallet results...")
logger.debug("response Done")
MalletModel(contentID, topics)
}
//calling one future after another to run future sequntially
val result: Future[MalletModel] =
saveFile(malletFile).flatMap(malletFile =>
t2v(malletFile).flatMap(mf =>
infer(mf).flatMap(mf =>
response(mf))))
result
}
}
看来我自己已经得到了答案。问题是我试图在资源目录中写入文件并读回文件。为了解决这个问题,我在 tmpDir 中编写了文件,并使用缓冲区阅读器而不是源读取它。砰的一声,它奏效了。