我有一个巨大的JSON文件,其中的一小部分如下:
{
"socialNews": [{
"adminTagIds": "",
"fileIds": "",
"departmentTagIds": "",
........
........
"comments": [{
"commentId": "",
"newsId": "",
"entityId": "",
....
....
}]
}]
.....
}
我在socialNews上应用了横向视图爆炸:
val rdd = sqlContext.jsonFile("file:///home/ashish/test")
rdd.registerTempTable("social")
val result = sqlContext.sql("select * from social LATERAL VIEW explode(socialNews) social AS comment")
现在我想把这个结果(DataFrame)转换回JSON并保存到一个文件中,但是我找不到任何Scala API来进行转换。有没有什么标准库可以做到这一点,或者有什么方法可以解决这个问题?
val result: DataFrame = sqlContext.read.json(path)
result.write.json("/yourPath")
方法write
在类DataFrameWriter中,应该可以在DataFrame
对象上访问。只需确保您的rdd类型为DataFrame
,而不是已弃用的SchemaRdd
类型。您可以显式地提供类型定义val data: DataFrame
或使用toDF()
强制转换为dataFrame。
如果你有一个DataFrame有一个API转换回包含json记录的RDD[String]。
val df = Seq((2012, 8, "Batman", 9.8), (2012, 8, "Hero", 8.7), (2012, 7, "Robot", 5.5), (2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")
df.toJSON.saveAsTextFile("/tmp/jsonRecords")
df.toJSON.take(2).foreach(println)
这应该从Spark 1.4以后可用。在您创建的结果DataFrame上调用API。
这里列出了可用的api
sqlContext.read().json(dataFrame.toJSON())
当您以
方式运行spark作业时--master local --deploy-mode client
然后,df.write.json('path/to/file/data.json')
作品。
如果你运行在集群[头节点]上,[--master yarn --deploy-mode cluster
]更好的方法是将数据写入aws s3或azure blob并从中读取。
df.write.json('s3://bucket/path/to/file/data.json')
works
如果您仍然无法找到将Dataframe转换为JSON的方法,您可以使用to_json或toJSON内置的Spark函数。
让我知道,如果你有一个样本数据帧和JSON格式转换。