如何从未来回调中保存和返回数据



过去几天我一直在面对问题,以保存&处理来自Scala期货的数据。我是两者的语言和概念的新手。Lagom在Cassandra上的文档说要实现大约9个代码文件,我想确保我的数据库代码在将其传播到这么多代码之前。

具体来说,我目前正在尝试实施概念证明,以将数据发送到lagom为您实施的Cassandra数据库。到目前为止,我能够将数据发送和从数据库中发送和检索,但是由于所有这些都不同步,因此我在返回数据时遇到了麻烦,并且还返回了数据成功返回的数据。

我已经玩了一段时间了。检索代码看起来像这样:

override def getBucket(logicalBucket: String) = ServiceCall[NotUsed, String] {
request => {
  val returnList = ListBuffer[String]()
  println("Retrieving Bucket " + logicalBucket)
  val readingFromTable = "SELECT * FROM data_access_service_impl.s3buckets;"
  //DB query
  var rowsFuture: Future[Seq[Row]] = cassandraSession.selectAll(readingFromTable)
  println(rowsFuture)
  Await.result(rowsFuture, 10 seconds)
  rowsFuture onSuccess {
    case rows => {
      println(rows)
      for (row <- rows) println(row.getString("name"))
      for (row <- rows) returnList += row.getString("name")
      println("ReturnList: " + returnList.mkString)
    }
  }
  rowsFuture onFailure {
    case e => println("An error has occured: " + e.getMessage)
    Future {"An error has occured: " + e.getMessage}
  } 
  Future.successful("ReturnList: " + returnList.mkString)
 }      
}

运行时,我在OnSuccess回调中获得了预期的数据库值。但是,我在返回语句中使用的相同变量在回调的打印范围之外为空(并返回空数据(。这也发生在我使用的"插入"函数中,其中它并不总是返回我在回调函数中设置的变量。

如果我尝试将语句放在回调函数中,则会给我一个错误的错误"返回单元,期望未来[字符串]'。因此,我被困在无法从回调函数中返回的地方,所以我不能保证自己返回数据(。

我的目标是将字符串返回到API,以便显示DB中所有S3存储桶名的列表。这意味着在未来的[seq [row]数据类型中迭代,并将数据保存到串联的字符串中。如果有人能提供帮助,他们将解决我通过Lagom,Akka,DataStax和Cassandra文档阅读的2周问题。目前,我很高兴(信息过载(,而且我在此上没有发现清晰指南。

供参考,这是Cassandrasession文档:

lagomtutorial/文档样式信息与唯一的cassandra Query示例cassandrasession.scala代码

有关Future,(和OptionEitherTry(的关键是您(通常(没有获得值 out ,您将计算带入他们。最常见的方法是使用mapflatMap方法。

在您的情况下,您需要服用Seq[Row]并将其转换为String。但是,您的Seq[Row]包裹在称为Future的不透明数据结构中,因此,如果您的实际实际具有Seq[Row],则不能像rows.mkString一样。因此,将您的计算rows.mkString带入数据:

而不是获得值并执行计算,而是将计算
//DB query
val rowsFuture: Future[Seq[Row]] = cassandraSession.selectAll(readingFromTable)
val rowToString = (row: Row) => row.getString("name")
val computation = (rows: Seq[Row]) => rows.map(rowToString).mkString
// Computation to the data, rather than the other way around
val resultFuture = rowsFuture.map(computation)

现在,当rowsFuture完成后,您通过调用rowsFuture.map创建的新未来将得到满足,结果是在Seq[Row]上调用computation,您实际上实际上关心

那时,您只能return resultFuture,一切都会按预期工作,因为调用getBucket的代码期望Future并将适当处理。

为什么Future不透明?

简单原因是因为它代表了当前可能不存在的值。您只能在值时获得值,但是当您启动呼叫时,它不存在。代码不是让您自己进行一些isComplete字段进行轮询,而是让您注册计算(例如onSuccessonFailure(或使用mapflatMap创建新的未来值。

更深的原因是因为 Future是一个单子,而单调涵盖计算,但没有从中提取该计算的操作

替换特定选择的选择和要为特定字段获得的字段。示例仅用于测试,不是架构。

package ldg.com.dbmodule.model
/**
* Created by ldipotet on 05/11/17.
*/
import com.datastax.driver.core.{Cluster, ResultSet, ResultSetFuture}
import scala.util.{Failure, Success, Try}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
//Use Implicit Global Context is strongly discouraged! we must create    
//our OWN execution CONTEXT !
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Future, _}
import scala.concurrent.duration._

object CassandraDataStaxClient {
//We create here a CallBack in Scala with the DataStax api
implicit def resultSetFutureToScala(rf: ResultSetFuture):    
Future[ResultSet] = {
val promiseResult = Promise[ResultSet]()
val producer = Future {
    getResultSet(rf) match {
       //we write a promise with an specific value   
       case Success(resultset) => promiseResult success resultset
       case Failure(e) => promiseResult failure (new 
                       IllegalStateException)
    }
  }
  promiseResult.future
}
def getResultSet: ResultSetFuture => Try[ResultSet] = rsetFuture => {
    Try(
       // Other choice can be:
       // getUninterruptibly(long timeout, TimeUnit unit) throws 
          TimeoutException
       // for an specific time
       //can deal an IOException
       rsetFuture.getUninterruptibly
   )
 }
 def main(args: Array[String]) {
    def defaultFutureUnit() = TimeUnit.SECONDS
    val time = 20 seconds
    //Better use addContactPoints and adds more tha one contact point
   val cluster = Cluster.builder().addContactPoint("127.0.0.1").build()
   val session = cluster.connect("myOwnKeySpace")
   //session.executeAsync es asyncronous so we'll have here a 
   //ResultSetFuture
  //converted to a resulset due to Implicitconversion
  val future: Future[ResultSet] = session.executeAsync("SELECT * FROM 
  myOwnTable")
  //blocking on a future is strongly discouraged!! next is an specific     
  //case
  //to make sure that all of the futures have been completed
  val results = Await.result(future,time).all()
  results.foreach(row=>println(row.getString("any_String_Field"))
  session.close()
  cluster.close()
  }
 }

相关内容

  • 没有找到相关文章

最新更新