当 Cassandra 写入操作在 Akka 中完成时收到通知



在这个 Akka 和 Cassandra 教程中,写入 Cassandra 并期望读回相同数据的集成测试在写入和读取之间插入 1 秒的延迟。延迟允许有时间通过网络传输写入请求并在服务器上进行处理。这是必需的,因为应用程序调用session.executeAsync向 Cassandra 发送请求,并且继续而不处理来自 Cassandra 的响应:

class TweetWriterActor(cluster: Cluster) extends Actor {
  val session = cluster.connect(Keyspaces.akkaCassandra)
  val preparedStatement = session.prepare("INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);")
  def saveTweet(tweet: Tweet): Unit =
    session.executeAsync(preparedStatement.bind(tweet.id.id, tweet.user.user, tweet.text.text, tweet.createdAt))
  def receive: Receive = {
    case tweets: List[Tweet] => tweets.foreach(saveTweet)
    case tweet: Tweet        => saveTweet(tweet)
  }
}
完成写入

请求所需的时间通常比 1 秒少得多,因此,如果在尝试读取之前收到写入完成的通知,测试可以运行得更快。您将如何更改代码以执行此操作,同时坚持非阻塞 I/O 操作?

executeAsync返回[java] future,你可以等待它或附加一些回调(感谢它从guava实现了ListenableFuture)。一般的替代方法可能是从 akka 未来内部使用同步 api。

Apache Cassandra 和 Datastax Enterprise 的官方 Scala 驱动程序完全支持 CQL 3.0,是幻影。

Phantom是在Datastax官方合作伙伴Websudos开发的,明确是为了取代所有其他驱动程序。它正在积极开发和维护,完全支持所有最新的Cassandra功能。

要在写入完成时收到"通知",请使用默认 api:SomeTable.update.where(_.id eqs id).update(name setTo "test").future()

这将返回Future[ResultSet],当未来完成时,操作也是如此。

最新更新