使用 sparksql 在 mongodb 中加载 JSON 数据



我有一个JSON文件,我想使用Spark SQL将其加载到MongoDB中。我有办法将单个元素加载到如下所示的集合中

val mongoClient = MongoClient(127.0.0.1, 27017)
val collection = mongoClient(dbname)(collection_name)
for (a <- 1 to 10) {
  collection.insert {
    MongoDBObject("id" -> a.toString,
      "age" -> (10 + a),
      "description" -> s"description $a",
      "enrolled" -> (a % 2 == 0),
      "name" -> s"Name $a")
  }

由于MongoDB已经以JSON格式存储数据,有没有办法直接加载我的JSON文件?

当然,MongoDB的底部数据格式是BSON,但请记住,我们可以使用mongoexport以json格式导出MongoDB文档。也许这与你的情况无关,我的观点是我们实际上可以使用SparkSQL加载json文件,以下是我尝试完成它的方式。

我们需要的一个重要库是Spark库,用于轻松访问MongoDB。

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import com.stratio.datasource.mongodb._
import com.stratio.datasource.mongodb.MongodbConfig._
object DFToMongoDB extends App {
  lazy val sc = new SparkContext(new SparkConf()
    .setAppName("mongodb")
    .setMaster("local[4]"))
  val sqlContext = new SQLContext(sc)
  /*
    sample data in xxx.json  
    {
      "_id":"xxxx","workHome":false,"commute":true,
      "tel":false,"weekend":true,"age":100.0
    }
  */
  val dataFrame = sqlContext.read.json("xxx.json")
  def forCollection(collectionName: String) = {
    MongodbConfigBuilder(
      Map(
        Host -> List("127.0.0.1"), Database -> "xxx", 
        Collection -> collectionName,
        SamplingRatio -> 1.0, 
        WriteConcern -> com.mongodb.casbah.WriteConcern.Acknowledged,
        SplitSize -> 8, SplitKey -> "_id"
      )
    ).build
  }
  // dataFrame will be converted to MongodbDataFrame here 
  dataFrame.saveToMongodb(forCollection("xxx"))
}

此外,我尝试的情况是加载之前 MongoDB 中不存在集合,之后 MongoDB 将使用 json 文件中的记录创建集合。至于其他情况,如果您愿意,可以尝试。

无论如何,希望它有所帮助。

相关内容

  • 没有找到相关文章

最新更新