Play + ReactiveMongo:封顶收集和可追踪光标



我使用Play框架与Scala, Akka和ReactiveMongo。我想在MongoDB中使用一个集合作为一个循环队列。几个参与者可以向其中插入文档;一个参与者在这些文档可用时立即检索它们(一种发布-订阅系统)。我使用了封顶集合和可跟踪游标。每次我检索一些文档时,我都必须运行命令EmptyCapped来刷新被封顶的集合(不可能从其中删除元素),否则我总是检索相同的文档。有没有别的解决办法?例如,是否有一种方法可以滑动光标而不移除元素?或者在我的情况下最好不要使用封顶收集?

object MexDB {
def db: reactivemongo.api.DB = ReactiveMongoPlugin.db
val size: Int = 10000
// creating capped collection
val collection: JSONCollection = {
    val c = db.collection[JSONCollection]("messages")
    val isCapped = coll.convertToCapped(size, None)
    Await.ready(isCapped, Duration.Inf)
    c
}
def insert(mex: Mex) = {
    val inserted = collection.insert(mex)
    inserted onComplete {
      case Failure(e) =>
        Logger.info("Error while inserting task: " + e.getMessage())
        throw e
      case Success(i) =>
        Logger.info("Successfully inserted task")
    }
}

def find(): Enumerator[Mex] = {
  val cursor: Cursor[Mex] = collection
    .find(Json.obj())
    .options(QueryOpts().tailable.awaitData)
    .cursor[Mex]
    // meaning of maxDocs ???
    val maxDocs = 1
    cursor.enumerate(maxDocs)
}

def removeAll() = {
    db.command(new EmptyCapped("messages"))
}

}

/*** part of receiver actor code ***/
// inside preStart
val it = Iteratee.fold[Mex, List[Mex]](Nil) {
    (partialList, mex) => partialList ::: List(mex)
}
// Inside "receive" method
case Data =>
  val e: Enumerator[Mex] = MexDB.find()
  val future = e.run(it)
  future onComplete {
    case Success(list) =>
      list foreach { mex =>
        Logger.info("Mex: " + mex.id)
      }
      MexDB.removeAll()
      self ! Data
    case Failure(e) => Logger.info("Error:  "+ e.getMessage())
  }

您的可跟踪游标在每个找到的文档之后作为maxDocs = 1关闭。要使它无限期地打开,你应该忽略这个限制。

对于awaitData,只有在显式关闭RM时才会调用.onComplete

您需要使用游标中的一些流函数,例如.enumerate,并处理每个新的步骤/结果。见https://github.com/sgodbillon/reactivemongo-tailablecursor-demo/

最新更新