给定 Spark 中的以下数据框:
Name,LicenseID_1,TypeCode_1,State_1,LicenseID_2,TypeCode_2,State_2,LicenseID_3,TypeCode_3,State_3
"John","123ABC",1,"WA","456DEF",2,"FL","789GHI",3,"CA"
"Jane","ABC123",5,"AZ","DEF456",7,"CO","GHI789",8,"GA"
我如何在 Spark 中使用 scala 将其作为文档集合写入 mongodb,如下所示:
{ "Name" : "John",
"Licenses" :
{
[
{"LicenseID":"123ABC","TypeCode":"1","State":"WA" },
{"LicenseID":"456DEF","TypeCode":"2","State":"FL" },
{"LicenseID":"789GHI","TypeCode":"3","State":"CA" }
]
}
},
{ "Name" : "Jane",
"Licenses" :
{
[
{"LicenseID":"ABC123","TypeCode":"5","State":"AZ" },
{"LicenseID":"DEF456","TypeCode":"7","State":"CO" },
{"LicenseID":"GHI789","TypeCode":"8","State":"GA" }
]
}
}
我尝试这样做,但在以下代码中被阻止:
val customSchema = StructType(Array( StructField("Name", StringType, true), StructField("LicenseID_1", StringType, true), StructField("TypeCode_1", StringType, true), StructField("State_1", StringType, true), StructField("LicenseID_2", StringType, true), StructField("TypeCode_2", StringType, true), StructField("State_2", StringType, true), StructField("LicenseID_3", StringType, true), StructField("TypeCode_3", StringType, true), StructField("State_3", StringType, true)))
val license = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("D:\test\test.csv")
case class License(LicenseID:String, TypeCode:String, State:String)
case class Data(Name:String, Licenses: Array[License])
val transformedData = license.map(data => Data(data(0),Array(License(data(1),data(2),data(3)),License(data(4),data(5),data(6)),License(data(7),data(8),data(9)))))
<console>:46: error: type mismatch;
found : Any
required: String
val transformedData = license.map(data => Data(data(0),Array(License(data(1),data(2),data(3)),License(data(4),data(5),data(6)),License(data(7),data(8),data(9)))))
...
不确定您的问题是什么,添加示例如何保存数据火花和芒果
- https://docs.mongodb.com/spark-connector/current/
-
https://docs.mongodb.com/spark-connector/current/scala-api/
import org.apache.spark.sql.SparkSessionimport com.mongodb.spark.sql._
val sc: SparkContext//一个现有的 SparkContext。val sparkSession = SparkSession.builder().getOrCreate()
蒙戈火花助手val df = MongoSpark.load(sparkSession)//使用 SparkConf
读
sparkSession.loadFromMongoDB() // Uses the SparkConf for configuration
sparkSession.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://example.com/database.collection"))) // Uses the ReadConfig
sparkSession.read.mongo()
sparkSession.read.format("com.mongodb.spark.sql").load()
// Set custom options:
sparkSession.read.mongo(customReadConfig)
sparkSession.read.format("com.mongodb.spark.sql").options.
(customReadConfig.asOptions).load()
连接器提供了将数据持久化到MongoDB中的能力。
MongoSpark.save(centenarians.write.option("collection", "hundredClub"))
MongoSpark.load[Character](sparkSession, ReadConfig(Map("collection" ->
"data"), Some(ReadConfig(sparkSession)))).show()
保存数据的替代方法
dataFrameWriter.write.mongo()
dataFrameWriter.write.format("com.mongodb.spark.sql").save()
添加 .toString 解决了这个问题,我能够将我想要的格式保存到 mongodb。
val transformedData = license.map(data => Data(data(0).toString,Array(License(data(1).toString,data(2).toString,data(3).toString),License(data(4).toString,data(5).toString,data(6).toString),License(data(7).toString,data(8).toString,data(9).toString))))