我有一个JSON文件,我想使用Spark SQL将其加载到MongoDB中。我有办法将单个元素加载到如下所示的集合中
val mongoClient = MongoClient(127.0.0.1, 27017)
val collection = mongoClient(dbname)(collection_name)
for (a <- 1 to 10) {
collection.insert {
MongoDBObject("id" -> a.toString,
"age" -> (10 + a),
"description" -> s"description $a",
"enrolled" -> (a % 2 == 0),
"name" -> s"Name $a")
}
由于MongoDB已经以JSON格式存储数据,有没有办法直接加载我的JSON文件?
当然,MongoDB的底部数据格式是BSON,但请记住,我们可以使用mongoexport
以json格式导出MongoDB文档。也许这与你的情况无关,我的观点是我们实际上可以使用SparkSQL加载json文件,以下是我尝试完成它的方式。
我们需要的一个重要库是Spark库,用于轻松访问MongoDB。
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import com.stratio.datasource.mongodb._
import com.stratio.datasource.mongodb.MongodbConfig._
object DFToMongoDB extends App {
lazy val sc = new SparkContext(new SparkConf()
.setAppName("mongodb")
.setMaster("local[4]"))
val sqlContext = new SQLContext(sc)
/*
sample data in xxx.json
{
"_id":"xxxx","workHome":false,"commute":true,
"tel":false,"weekend":true,"age":100.0
}
*/
val dataFrame = sqlContext.read.json("xxx.json")
def forCollection(collectionName: String) = {
MongodbConfigBuilder(
Map(
Host -> List("127.0.0.1"), Database -> "xxx",
Collection -> collectionName,
SamplingRatio -> 1.0,
WriteConcern -> com.mongodb.casbah.WriteConcern.Acknowledged,
SplitSize -> 8, SplitKey -> "_id"
)
).build
}
// dataFrame will be converted to MongodbDataFrame here
dataFrame.saveToMongodb(forCollection("xxx"))
}
此外,我尝试的情况是加载之前 MongoDB 中不存在集合,之后 MongoDB 将使用 json 文件中的记录创建集合。至于其他情况,如果您愿意,可以尝试。
无论如何,希望它有所帮助。