我创建了以下案例类:
case class Data(ads:Option[Ads])
case class Ads(subject: Option[String]
, body:Option[String]
, price:Option[Int]
, location:Option[Location]
, attribut:Option[Seq[Attribut]]
)
case class Location(city:Option[String]
, zipcode:Option[String])
case class Attribut(key_label:Option[String]
, value_label:Option[String]
)
我用play框架解析了一个JSON格式(HTML的一部分(。
我终于获得了一个对象广告
JsSuccess(Ads(Some("Subject"), SOme("Body"), Some(Price), Some(Location(Some("City"), Some("Zipcode")), Some(Attribut("key_label", "value_label"))
我想以以下方式将其保存在CSV文件中:
Subject Body Price City Zipcode Key_Label Value_Label
Play Playing games 532 Geneve 95 GEN Gen2
我将对象转换为Ads(Some("Subject"), Some("Body"), Some(Price), Some(Location(Some("City"), Some("Zipcode")), Some(Attribut("key_label", "value_label")
的列表,并将此列表转换为DataFrame。
但是我只有一列Value,它包含了对象的所有元素。
Value
(Some("Subject"), SOme("Body"), Some(Price), Some(Location(Some("City"), Some("Zipcode")), Some(Attribut("key_label", "value_label")
有人有主意吗?我真的不明白如何将scala对象与数据集和数据帧链接起来。谢谢你的帮助。
注释很有帮助,但通用的扁平化函数可能不会按所需顺序输出列,和/或处理将数组元素放入自己单独的列中。
假设您的JSON文件包含以下行:
{"ads": {"subject": "abc", "body": "doing something", "price": 13, "location": {"city": "Houston", "zipcode": 39014}, "attribut": [{"key_label": "a", "value_label": "b"}]}}
如果文件相当一致,并且您已经将Spark作为依赖项包含在内,则可能不需要使用单独的库来解析JSON。
您将需要使用CCD_ 2函数来处理以下事实:;attribut";列是一个列表。如果列表可能为空,但您希望保留其他列的值,请改用explode_outer
函数。
import org.apache.spark.sql.functions._
// assuming spark is the Spark Session
val df = spark.read.json("mydata.json")
val df1 = df.select(col("ads.subject").alias("Subject"), col("ads.body").alias("Body"),
col("ads.location.city").alias("City"), col("ads.location.zipcode").alias("Zipcode"),
explode(col("ads.attribut")))
val resultDF = df1.select(col("Subject"), col("Body"), col("City"), col("Zipcode"),
col("col.key_label"), col("col.value_label"))
resultDF.show
将输出:
+-------+---------------+-------+-------+---------+-----------+
|Subject| Body| City|Zipcode|key_label|value_label|
+-------+---------------+-------+-------+---------+-----------+
| abc|doing something|Houston| 39014| a| b|
+-------+---------------+-------+-------+---------+-----------+
在指定的目录中输出为单个CSV文件,并带有标题:
resultDF.repartition(1).write.option("header", "true").csv("/tmp/my-output-dir/")