Spark:将数据帧导入mongodb(scala)



给定 Spark 中的以下数据框:

Name,LicenseID_1,TypeCode_1,State_1,LicenseID_2,TypeCode_2,State_2,LicenseID_3,TypeCode_3,State_3    
"John","123ABC",1,"WA","456DEF",2,"FL","789GHI",3,"CA"
"Jane","ABC123",5,"AZ","DEF456",7,"CO","GHI789",8,"GA"

我如何在 Spark 中使用 scala 将其作为文档集合写入 mongodb,如下所示:

{ "Name" : "John", 
  "Licenses" : 
  {
    [
      {"LicenseID":"123ABC","TypeCode":"1","State":"WA" },
      {"LicenseID":"456DEF","TypeCode":"2","State":"FL" },
      {"LicenseID":"789GHI","TypeCode":"3","State":"CA" }
    ]
  }
},
{ "Name" : "Jane", 
  "Licenses" : 
  {
    [
      {"LicenseID":"ABC123","TypeCode":"5","State":"AZ" },
      {"LicenseID":"DEF456","TypeCode":"7","State":"CO" },
      {"LicenseID":"GHI789","TypeCode":"8","State":"GA" }
    ]
  }
}

我尝试这样做,但在以下代码中被阻止:

 val customSchema = StructType(Array( StructField("Name", StringType, true), StructField("LicenseID_1", StringType, true), StructField("TypeCode_1", StringType, true), StructField("State_1", StringType, true), StructField("LicenseID_2", StringType, true), StructField("TypeCode_2", StringType, true), StructField("State_2", StringType, true), StructField("LicenseID_3", StringType, true), StructField("TypeCode_3", StringType, true), StructField("State_3", StringType, true)))
 val license = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("D:\test\test.csv")
 case class License(LicenseID:String, TypeCode:String, State:String)
 case class Data(Name:String, Licenses: Array[License])
 val transformedData = license.map(data => Data(data(0),Array(License(data(1),data(2),data(3)),License(data(4),data(5),data(6)),License(data(7),data(8),data(9)))))
<console>:46: error: type mismatch;
 found   : Any
 required: String
       val transformedData = license.map(data => Data(data(0),Array(License(data(1),data(2),data(3)),License(data(4),data(5),data(6)),License(data(7),data(8),data(9)))))
...

不确定您的问题是什么,添加示例如何保存数据火花和芒果

  • https://docs.mongodb.com/spark-connector/current/
  • https://docs.mongodb.com/spark-connector/current/scala-api/

    import org.apache.spark.sql.SparkSessionimport com.mongodb.spark.sql._

    val sc: SparkContext//一个现有的 SparkContext。val sparkSession = SparkSession.builder().getOrCreate()

    蒙戈火花助手val df = MongoSpark.load(sparkSession)//使用 SparkConf

sparkSession.loadFromMongoDB() // Uses the SparkConf for configuration
sparkSession.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://example.com/database.collection"))) // Uses the ReadConfig
sparkSession.read.mongo()
sparkSession.read.format("com.mongodb.spark.sql").load()
// Set custom options:
sparkSession.read.mongo(customReadConfig)
sparkSession.read.format("com.mongodb.spark.sql").options.
 (customReadConfig.asOptions).load()

连接器提供了将数据持久化到MongoDB中的能力。

    MongoSpark.save(centenarians.write.option("collection", "hundredClub"))
    MongoSpark.load[Character](sparkSession, ReadConfig(Map("collection" -> 

"data"), Some(ReadConfig(sparkSession)))).show()

保存数据的替代方法

dataFrameWriter.write.mongo()
dataFrameWriter.write.format("com.mongodb.spark.sql").save()

添加 .toString 解决了这个问题,我能够将我想要的格式保存到 mongodb。

val transformedData = license.map(data => Data(data(0).toString,Array(License(data(1).toString,data(2).toString,data(3).toString),License(data(4).toString,data(5).toString,data(6).toString),License(data(7).toString,data(8).toString,data(9).toString))))

最新更新