Akka Actor抛出了无效的指针错误,原因未知



我有一个rest控制器,该控制器调用服务,进而调用演员从模拟数据库中获取查询。该消息传给了演员,但是该应用程序在演员响应之前就崩溃了,并且演员有一个无效的指针例外。我正在为控制器和路由指令使用Akka HTTP来构成响应。这些是我的依赖性:

"com.typesafe.akka" %% "akka-http"   % "10.1.8",
"com.typesafe.akka" %% "akka-actor"  % "2.5.22",
"com.typesafe.akka" %% "akka-stream" % "2.5.22",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.1.8"


class CacheActor extends Actor {
  val tweetRepositoryInMemory: TweetRepositoryInMemory = new TweetRepositoryInMemory()
  val log = Logging(context.system, this)
  var tweetMap: scala.collection.mutable.Map[String, List[String]] =
    scala.collection.mutable.Map[String, List[String]]()
  // consult the in-memory map, if the username is not found, call the repository, update  the map, and return the tweets
  def queryRepo(username: String): Future[Option[List[String]]] = {
    if (tweetMap isDefinedAt username) {
      return Future(tweetMap.get(username))
    } else {
      var listOfTweetTexts: List[String] = List[String]()
      val queryLimit = 10
      val resultTweets: Future[Seq[Tweet]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
      onComplete(resultTweets) {
        case Success(tweets) =>
          for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
          tweetMap(username) = listOfTweetTexts
          return Future(Option(listOfTweetTexts))
        case Failure(t) =>
          log.error("An error has occurred: " + t.getMessage)
          return null
      }
    }
    return null
  }
  def receive = {
    case message: TweetQuery => // .take(message.limit)
      val queryResult: Future[Option[List[String]]] = queryRepo(message.userName)
      queryResult onComplete {
        case Success(result) => sender() ! result
        case Failure(t) => log.error("An error has occurred: " + t.getMessage)
      }
  }
}

stacktrace会很有帮助,但我怀疑您的receive中的这一行会导致NPE:

queryResult onComplete {

您的queryRepo函数返回null如果tweetMap未定义在username

update

这就是原因:

queryRepo功能在一个情况下返回Furture[Seq[String]]

if (tweetMap isDefinedAt username) {
  return Future(tweetMap.get(username))
}

在其他块中您创建Future[Seq[String]]

val resultTweets: Future[Seq[String]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)

,但是您永远不会回来。而不是您将回调传递给未来时的未来执行,因此在期货完成时会异步执行,因此onComplete。(我注意到您不直接在Future上调用onComplete,而是一个函数onComplete,该功能将未来和部分函数作为参数,我假设该功能会登记常规onComplete回调。(

。( 。

因此,如果语句为Unit不是 Future[Seq[String]]

代码

 for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
 tweetMap(username) = listOfTweetTexts
 return Future(Option(listOfTweetTexts))

很可能是在queryRepo返回null之后执行的。

def queryRepo(username: String): Future[Option[List[String]]] = {
  if (tweetMap isDefinedAt username) {
    ...
  } else {
    ...
  }
  return null // <--- here
}

结束更新

如果更改以下行:

val resultTweets: Future[Seq[Tweet]] = tweetRepositoryInMemory.searchByUserName(username, queryLimit)
  onComplete(resultTweets) {
    case Success(tweets) =>
      for (tweet <- tweets) { listOfTweetTexts ::= tweet.text; }
      tweetMap(username) = listOfTweetTexts
      return Future(Option(listOfTweetTexts))
    case Failure(t) =>
      log.error("An error has occurred: " + t.getMessage)
      return null
  }

to:

tweetRepositoryInMemory.searchByUserName(username, queryLimit).map { tweets =>
  // NOTE: This happens asynchronously in the `Future`. IT is better not to close over local variables  
  val listOfTweetTexts = for (tweet <- tweets) yield { tweet.text }
  // again, access to an actor member from within a `Future` is not wise or rather a bug in the making. 
  // But I will not refactor that much here. The way to do this would be to send a message to self and process the mutable member within `receive`
  tweetMap(username) = listOfTweetTexts
  Option(listOfTweetTexts)
}

NullPointerException不应再发生。

但是,我的印象是,您可以在Scala中使用 Futures Actors 功能编程的更多培训。

例如,

  • 演员的可变局部状态只有从其receive访问而不是在异步FutureThread
  • 中访问的情况下才能起作用。
  • 一个ususly在
  • 如果不是Java API互操作性,则无需 ever 返回null
  • 还有更多要点...

最新更新