import scala.collection.JavaConverters._
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.io._
import org.json.JSONObject
import org.json4s.native.Serialization._
import org.mongodb.scala._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Updates._
import org.mongodb.scala.Document._
import com.mongodb.MongoCredential
import com.mongodb.async.client.MongoClientSettings
import com.mongodb.connection.ClusterSettings
import com.mongodb.client.model.UpdateOptions
import com.mongodb.client.result.UpdateResult
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent._
import scala.util._
object Async {
def main(args: Array[String]): Unit =
{
var mongoClient: MongoClient = null
val credential: MongoCredential = MongoCredential.createScramSha1Credential("user", "database", "password".toArray)
val clusterSettings: ClusterSettings = ClusterSettings.builder().hosts(List(new ServerAddress("localhost:27017")).asJava).build()
val settings: MongoClientSettings = MongoClientSettings.builder().codecRegistry(MongoClient.DEFAULT_CODEC_REGISTRY).clusterSettings(clusterSettings).credentialList(List(credential).asJava).build()
mongoClient = MongoClient(settings)
val db: MongoDatabase = mongoClient.getDatabase("database")
var collection: MongoCollection[Document] = db.getCollection("collectionName")
var output : Document = Document()
var query: Document = Document()
var projection: Document = Document()
output = find(query, projection, collection)
println(output)
}
def find(query: Document, projection: Document, collectionName : MongoCollection[Document]) : Document = {
var previousDoc : Document = Document()
var future = collectionName.find(equal("_id", query)).projection(projection).toFuture()
collectionName.find(equal("_id", query)).projection(projection).subscribe(
(data: Document) => { previousDoc = data },
(error: Throwable) => println(s"Query failed: ${error.getMessage}"),
() => println("Done")
)
Await.result(future, Duration(100000, MILLISECONDS))
previousDoc
}
}
我在mongodb中执行了查找操作,这里是使用scala的代码。但是上述代码的执行是非阻塞的,所以在从mongodb检索数据之前进程结束。我想知道,如何控制非阻塞而不使用Await和Thread执行mongodb操作。睡眠功能。
Future是一种占位符对象,您可以为尚不存在的结果创建它。一般来说,Future的结果是并发计算的,可以稍后收集。
Scala提供了flatMap、foreach和filter等组合子,用于以非阻塞的方式组合未来。
你可以这样做
future.flatMap{
response => //do something when the response is available ...
}
用于补偿
for {
response <- fuurure
} yield{
//do something when the response is available ...
}
请参阅这个文件的未来和承诺http://docs.scala-lang.org/overviews/core/futures.html