MongoDB Scala-特定字段值的查询文档



所以我知道在蒙哥·壳牌中,您使用点符号在任何文档中获取所需的字段。

在mongodb scala中如何实现点符号。我对它的工作原理感到困惑。这是从集合中获取文档的代码:

val record = collection.find().projection(fields(include("offset"), excludeId())).limit(1)

编辑:

我正在尝试制定一种机制,以基本上重新处理Kafka记录,以关闭消费者。为此,我将KAFKA记录存储在外部数据库中,然后尝试从那里获取最新偏移量,并从那时开始消耗。这是我的Scala方法,应该这样做:

def getLatestCommitOffsetFromDB(collectionName: String): Long = {
import com.mongodb.Block
import org.bson.Document
val printBlock = new Block[Document]() {
  override def apply(document: Document): Unit = {
    println(document.toJson)
  }
}
import com.mongodb.async.SingleResultCallback
val callbackWhenFinished = new SingleResultCallback[Void]() {
  override def onResult(result: Void, t: Throwable): Unit = {
    System.out.println("Latest offset fetched from database.")
  }
}
var obj: String = " "
try {
  val record = collection.find().projection(fields(include("offset"), excludeId())).limit(1)
  //TODO FIND A WAY TO GET THE VALUE AND STORE IT IN A VARIABLE
} catch {
  case e: RuntimeException =>
    logger.error(s"MongoDB Server Error : Unable to fetch data from collection : $collection")
    logger.error(e.printStackTrace().toString())
}
obj.toLong
}

问题不是我可以从Mongo那里获取文档,因此我正在尝试访问Mongo的特定领域。该文档中有四个字段:主题,分区,消息和偏移。我想获得"偏移"字段并将其存储在变量中,因此我可以将其用作重新处理Kafka记录的重新启动点。

我要去哪里?

pom.xml

<?xml version="1.0" encoding="UTF-8"?>

http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0

<groupId>OffsetManagementPoC</groupId>
<artifactId>OffsetManagementPoC</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.12</artifactId>
        <version>1.0.0</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-compiler</artifactId>
        <version>2.11.8</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>1.7.25</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.10.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-core</artifactId>
        <version>2.6.5</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.6.5</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-annotations -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>2.6.5</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>casbah_2.12</artifactId>
        <version>3.1.1</version>
        <type>pom</type>
    </dependency>
    <dependency>
        <groupId>com.typesafe</groupId>
        <artifactId>config</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.scala</groupId>
        <artifactId>mongo-scala-driver_2.12</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-compiler</artifactId>
        <version>2.11.8</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongo-java-driver</artifactId>
        <version>3.4.2</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.scala</groupId>
        <artifactId>mongo-scala-driver_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>bson</artifactId>
        <version>3.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb</groupId>
        <artifactId>mongodb-driver-async</artifactId>
        <version>3.4.3</version>
    </dependency>
    <dependency>
        <groupId>org.mongodb.scala</groupId>
        <artifactId>mongo-scala-bson_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
</dependencies>

您可以这样修改查询:

import com.mongodb.MongoClient
import com.mongodb.client.MongoCollection
import com.mongodb.client.model.Projections
def getLatestCommitOffsetFromDB(
  databaseName: String,
  collectionName: String
): Long = {
  val mongoClient = new MongoClient("localhost", 27017);
  val collection =
    mongoClient.getDatabase(databaseName).getCollection(collectionName)
  val record = collection
    .find()
    .projection(
      Projections
        .fields(Projections.include("offset"), Projections.excludeId()))
    .first
  record.get("offset").asInstanceOf[Double].toLong
}

我认为您缺少com.mongodb.client.model.Projections进口,以便使用fieldsincludeexcludeId

我使用first而不是limit(1)使提取结果更容易。

first返回一个Document对象,您可以在该对象上调用get以检索请求字段的值。

但实际上,由于您只想要一个记录和一个字段,因此可以删除投影!:

val record = collection.find().first

根据文档, collection.find()接受 com.mongodb.DBObject

您可以使用该接口的实现之一是BasicDBObject,基本上与mutable.Map[String, Object]一样。您可以使用接受地图的构造函数:

val query = new com.mongodb.BasicDBObject(Map(
  "foo.bar" -> "value1"
  "bar.foo" -> "value2"
))
val record = collection.find(query)....

最新更新