在这个 Akka 和 Cassandra 教程中,写入 Cassandra 并期望读回相同数据的集成测试在写入和读取之间插入 1 秒的延迟。延迟允许有时间通过网络传输写入请求并在服务器上进行处理。这是必需的,因为应用程序调用session.executeAsync
向 Cassandra 发送请求,并且继续而不处理来自 Cassandra 的响应:
class TweetWriterActor(cluster: Cluster) extends Actor {
val session = cluster.connect(Keyspaces.akkaCassandra)
val preparedStatement = session.prepare("INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);")
def saveTweet(tweet: Tweet): Unit =
session.executeAsync(preparedStatement.bind(tweet.id.id, tweet.user.user, tweet.text.text, tweet.createdAt))
def receive: Receive = {
case tweets: List[Tweet] => tweets.foreach(saveTweet)
case tweet: Tweet => saveTweet(tweet)
}
}
完成写入请求所需的时间通常比 1 秒少得多,因此,如果在尝试读取之前收到写入完成的通知,测试可以运行得更快。您将如何更改代码以执行此操作,同时坚持非阻塞 I/O 操作?
executeAsync返回[java] future,你可以等待它或附加一些回调(感谢它从guava实现了ListenableFuture)。一般的替代方法可能是从 akka 未来内部使用同步 api。
Apache Cassandra 和 Datastax Enterprise 的官方 Scala 驱动程序完全支持 CQL 3.0,是幻影。
Phantom是在Datastax官方合作伙伴Websudos开发的,明确是为了取代所有其他驱动程序。它正在积极开发和维护,完全支持所有最新的Cassandra功能。
要在写入完成时收到"通知",请使用默认 api:SomeTable.update.where(_.id eqs id).update(name setTo "test").future()
这将返回Future[ResultSet]
,当未来完成时,操作也是如此。