我有一个rest控制器,该控制器调用服务,进而调用演员从模拟数据库中获取查询。该消息传给了演员,但是该应用程序在演员响应之前就崩溃了,并且演员有一个无效的指针例外。我正在为控制器和路由指令使用Akka HTTP来构成响应。这些是我的依赖性:
"com.typesafe.akka" %% "akka-http" % "10.1.8",
"com.typesafe.akka" %% "akka-actor" % "2.5.22",
"com.typesafe.akka" %% "akka-stream" % "2.5.22",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8"
class CacheActor extends Actor {
val tweetRepositoryInMemory: TweetRepositoryInMemory = new TweetRepositoryInMemory()
val log = Logging(context.system, this)
var tweetMap: scala.collection.mutable.Map[String, List[String]] =
scala.collection.mutable.Map[String, List[String]]()
// consult the in-memory map, if the username is not found, call the repository, update the map, and return the tweets
def queryRepo(username: String): Future[Option[List[String]]] = {
if (tweetMap isDefinedAt username) {
return Future(tweetMap.get(username))
} else {
var listOfTweetTexts: List[String] = List[String]()
val queryLimit = 10
val resultTweets: Future[Seq[Tweet]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
onComplete(resultTweets) {
case Success(tweets) =>
for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
tweetMap(username) = listOfTweetTexts
return Future(Option(listOfTweetTexts))
case Failure(t) =>
log.error("An error has occurred: " + t.getMessage)
return null
}
}
return null
}
def receive = {
case message: TweetQuery => // .take(message.limit)
val queryResult: Future[Option[List[String]]] = queryRepo(message.userName)
queryResult onComplete {
case Success(result) => sender() ! result
case Failure(t) => log.error("An error has occurred: " + t.getMessage)
}
}
}
stacktrace会很有帮助,但我怀疑您的receive
中的这一行会导致NPE:
queryResult onComplete {
您的queryRepo
函数返回null
如果tweetMap
未定义在username
。
update
这就是原因:
queryRepo
功能在一个情况下返回Furture[Seq[String]]
if (tweetMap isDefinedAt username) {
return Future(tweetMap.get(username))
}
在其他块中您创建Future[Seq[String]]
val resultTweets: Future[Seq[String]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
,但是您永远不会回来。而不是您将回调传递给未来时的未来执行,因此在期货完成时会异步执行,因此onComplete
。(我注意到您不直接在Future
上调用onComplete
,而是一个函数onComplete
,该功能将未来和部分函数作为参数,我假设该功能会登记常规onComplete
回调。(
因此,如果语句为Unit
和不是 Future[Seq[String]]
代码
for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
tweetMap(username) = listOfTweetTexts
return Future(Option(listOfTweetTexts))
很可能是在queryRepo
返回null
之后执行的。
def queryRepo(username: String): Future[Option[List[String]]] = {
if (tweetMap isDefinedAt username) {
...
} else {
...
}
return null // <--- here
}
结束更新
如果更改以下行:
val resultTweets: Future[Seq[Tweet]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
onComplete(resultTweets) {
case Success(tweets) =>
for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
tweetMap(username) = listOfTweetTexts
return Future(Option(listOfTweetTexts))
case Failure(t) =>
log.error("An error has occurred: " + t.getMessage)
return null
}
to:
tweetRepositoryInMemory.searchByUserName(username, queryLimit).map { tweets =>
// NOTE: This happens asynchronously in the `Future`. IT is better not to close over local variables
val listOfTweetTexts = for (tweet <- tweets) yield { tweet.text }
// again, access to an actor member from within a `Future` is not wise or rather a bug in the making.
// But I will not refactor that much here. The way to do this would be to send a message to self and process the mutable member within `receive`
tweetMap(username) = listOfTweetTexts
Option(listOfTweetTexts)
}
NullPointerException
不应再发生。
但是,我的印象是,您可以在Scala中使用 Futures , Actors 和功能编程的更多培训。
例如,
- 演员的可变局部状态只有从其
receive
访问而不是在异步Future
或Thread
中访问的情况下才能起作用。 - 一个ususly在
- 如果不是Java API互操作性,则无需 ever 返回
null
。 - 还有更多要点...