这是我用来将dataframe写入JSON的代码。我正在Zeppelin运行此代码:
val df = Seq((2012, 8, "Batman", 9.8), (2012, 8, "Hero", 8.7), (2012, 7, "Robot", 5.5), (2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")
df.write.json("/tmp/out.json")
我期望的是/tmp/out.json文件中编写的数据框数据。但是,它正在使用名称"/tmp/out.json"创建目录,在其中我发现以下两个文件:
_SUCCESS
._SUCCESS.crc
这些文件都没有JSON数据。我在这里缺少什么?
您有一些选择:
- 写入共享位置并合并文件(不使用Spark进行合并)
-
df.rdd.collect()
数据到驱动程序并写入文件。您将使用标准的Scala IO库,因此不会进行任何分区。这是一个缺点,不得不将所有数据从执行者提取到驾驶员,这取决于数据和驱动程序资源的数量,这可能是缓慢或不可行的。 - 比收集整个数据集更好的方法是依次收集每个分区,然后将数据流传输到驱动程序上的一个文件 上
例如:
val rdd = df.rdd
for (p <- rdd.partitions) {
val idx = p.index
val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true)
//The second argument is true to avoid rdd reshuffling
val data = partRdd.collect //data contains all values from a single partition
//in the form of array
//Now you can do with the data whatever you want: iterate, save to a file, etc.
}
https://stackoverflow.com/a/21801828/4697497