理解隐式并在mongodb Scala src中订阅

  • 本文关键字:src Scala mongodb mongodb scala
  • 更新时间 :
  • 英文 :


我试图更好地了解Scala MongoDB src

使用 scala mongodb 驱动程序 (api 文档 : http://mongodb.github.io/mongo-scala-driver/)

当我使用

  val collection: MongoCollection[Document] = database.getCollection("mycollection");
      val observable: Observable[Completed] = collection.insertOne(doc)

      observable.subscribe(new Observer[Completed] {
        override def onNext(result: Completed): Unit = println("Inserted")
        override def onError(e: Throwable): Unit = println("Failed")
        override def onComplete(): Unit = println("Completed")
      })

这是隐式方法

/**
     * Subscribes to the [[Observable]] and requests `Long.MaxValue`.
     *
     * Uses the default or overridden `onNext`, `onError`, `onComplete` partial functions.
     *
     * @param doOnNext anonymous function to apply to each emitted element.
     * @param doOnError anonymous function to apply if there is an error.
     * @param doOnComplete anonymous function to apply on completion.
     */
    def subscribe(doOnNext: T => Any, doOnError: Throwable => Any, doOnComplete: () => Any): Unit = {
      observable.subscribe(new Observer[T] {
        override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue)
        override def onNext(tResult: T): Unit = doOnNext(tResult)
        override def onError(throwable: Throwable): Unit = doOnError(throwable)
        override def onComplete(): Unit = doOnComplete()
      })
    }

原料 : https://github.com/mongodb/mongo-scala-driver/blob/master/driver/src/main/scala/org/mongodb/scala/ObservableImplicits.scala

调用自 :

 /**
   * Request `Observable` to start streaming data.
   *
   * This is a "factory method" and can be called multiple times, each time starting a new [[Subscription]].
   * Each `Subscription` will work for only a single [[Observer]].
   *
   * If the `Observable` rejects the subscription attempt or otherwise fails it will signal the error via [[Observer.onError]].
   *
   * @param observer the `Observer` that will consume signals from this `Observable`
   */
  def subscribe(observer: Observer[_ >: T]): Unit

来源 :https://github.com/mongodb/mongo-scala-driver/blob/master/driver/src/main/scala/org/mongodb/scala/Observable.scala

似乎调用订阅调用了一个新线程

(因为它被称为订阅),但我看不到从 src 在哪里调用这个新线程?

隐式用于实现这种"连接",当我使用 observable.subscribe(new Observer[Completed] {....

更新:

使用此代码:

import org.mongodb.scala.MongoClient;
import org.mongodb.scala.bson.collection.immutable.Document;
import org.mongodb.scala._
import org.scalatest._
import Matchers._
import org.mongodb.scala._
class MongoSpec extends FlatSpec with Matchers {
  "Test MongoDb" should "insert" in {
    {
      val mongoClient: MongoClient = MongoClient()
      val database: MongoDatabase = mongoClient.getDatabase("scala-poc");
      val doc: Document = Document("_id" -> 6, "name" -> "MongoDB", "type" -> "database",
        "count" -> 1, "info" -> Document("x" -> 203, "y" -> 100))
      val collection: MongoCollection[Document] = database.getCollection("documents");
      val observable: Observable[Completed] = collection.insertOne(doc)
      observable.subscribe(new Observer[Completed] {
        override def onNext(result: Completed): Unit = println("Inserted")
        override def onError(e: Throwable): Unit = println(" nnFailed " + e + "nn")
        override def onComplete(): Unit = println("Completed")
      })
      mongoClient.close();
    }
  }
}

以下原因异常:

Failed com.mongodb.MongoClientException: Shutdown in progress

mongoClient.close(); 在 insertOne 方法完成之前被调用。

所以插入一个或订阅方法是异步的?

  1. 不,subscribe(doOnNext, doOnError, doOnComplete)调用subscribe(observer)(从问题中引用的实现中可以看出)。因此,如果它也从那里调用,你会得到一个无限循环。当您编写类似 observer.subscribe(x => println(s"next = $x"), error => error.printStackTrace(), () => {}) 的东西时,会使用"接线"。

  2. 否,subscribe不会创建新线程。实现Observable的类大多包装来自Java MongoDB驱动程序的类,并调用自己的subscribe方法,例如 override def subscribe(observer: Observer[_ >: TResult]): Unit = observe(wrapped).subscribe(observer) .这些subscribe方法也不会启动新线程:有关一些解释,请参阅 https://mongodb.github.io/mongo-java-driver/3.1/driver-async/reference/observables/。

最新更新