无法将Spark DataFrame写入JSON文件



这是我用来将dataframe写入JSON的代码。我正在Zeppelin运行此代码:

val df = Seq((2012, 8, "Batman", 9.8), (2012, 8, "Hero", 8.7), (2012, 7, "Robot", 5.5), (2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")
df.write.json("/tmp/out.json")

我期望的是/tmp/out.json文件中编写的数据框数据。但是,它正在使用名称"/tmp/out.json"创建目录,在其中我发现以下两个文件:

_SUCCESS  
._SUCCESS.crc

这些文件都没有JSON数据。我在这里缺少什么?

您有一些选择:

  • 写入共享位置并合并文件(不使用Spark进行合并)
  • df.rdd.collect()数据到驱动程序并写入文件。您将使用标准的Scala IO库,因此不会进行任何分区。这是一个缺点,不得不将所有数据从执行者提取到驾驶员,这取决于数据和驱动程序资源的数量,这可能是缓慢或不可行的。
  • 比收集整个数据集更好的方法是依次收集每个分区,然后将数据流传输到驱动程序上的一个文件

例如:

val rdd = df.rdd
for (p <- rdd.partitions) {
    val idx = p.index
    val partRdd = rdd.mapPartitionsWithIndex(a => if (a._1 == idx) a._2 else Iterator(), true)
    //The second argument is true to avoid rdd reshuffling
    val data = partRdd.collect //data contains all values from a single partition 
                               //in the form of array
    //Now you can do with the data whatever you want: iterate, save to a file, etc.
}

https://stackoverflow.com/a/21801828/4697497

相关内容

  • 没有找到相关文章