如何在Scala Mongodb 3.0驱动程序中异步执行查找操作,而不指定Await和Thread


import scala.collection.JavaConverters._
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.io._
import org.json.JSONObject
import org.json4s.native.Serialization._
import org.mongodb.scala._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Updates._
import org.mongodb.scala.Document._
import com.mongodb.MongoCredential
import com.mongodb.async.client.MongoClientSettings
import com.mongodb.connection.ClusterSettings
import com.mongodb.client.model.UpdateOptions
import com.mongodb.client.result.UpdateResult
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent._
import scala.util._  
object Async {
 def main(args: Array[String]): Unit = 
{
var mongoClient: MongoClient = null
val credential: MongoCredential = MongoCredential.createScramSha1Credential("user", "database", "password".toArray)
val clusterSettings: ClusterSettings = ClusterSettings.builder().hosts(List(new ServerAddress("localhost:27017")).asJava).build()
val settings: MongoClientSettings = MongoClientSettings.builder().codecRegistry(MongoClient.DEFAULT_CODEC_REGISTRY).clusterSettings(clusterSettings).credentialList(List(credential).asJava).build()
mongoClient = MongoClient(settings)
val db: MongoDatabase = mongoClient.getDatabase("database")
var collection: MongoCollection[Document] = db.getCollection("collectionName")
var output : Document = Document()
var query: Document = Document()
var projection: Document = Document()
output = find(query, projection, collection)
println(output)
}
def find(query: Document, projection: Document, collectionName : MongoCollection[Document]) : Document = {
  var previousDoc : Document = Document()
        var future = collectionName.find(equal("_id", query)).projection(projection).toFuture()
                    collectionName.find(equal("_id", query)).projection(projection).subscribe(
                            (data: Document) => { previousDoc = data },                         
                            (error: Throwable) => println(s"Query failed: ${error.getMessage}"), 
                            () => println("Done")                                        
                            )
        Await.result(future, Duration(100000, MILLISECONDS))
        previousDoc
}
}

我在mongodb中执行了查找操作,这里是使用scala的代码。但是上述代码的执行是非阻塞的,所以在从mongodb检索数据之前进程结束。我想知道,如何控制非阻塞而不使用Await和Thread执行mongodb操作。睡眠功能。

Future是一种占位符对象,您可以为尚不存在的结果创建它。一般来说,Future的结果是并发计算的,可以稍后收集。

Scala提供了flatMap、foreach和filter等组合子,用于以非阻塞的方式组合未来。

你可以这样做

future.flatMap{ 
         response => //do something when the response is available ... 
 }

用于补偿

for {
     response <- fuurure
} yield{
     //do something when the response is available ... 
}

请参阅这个文件的未来和承诺http://docs.scala-lang.org/overviews/core/futures.html

最新更新