Spark编写带有S3自定义路径的压缩CSV



我正试图简单地使用Scala:中编写的Spark将CSV写入S3

我在输出bucket中注意到以下文件:...PROCESSED/montfh-04.csv/part-00000-723a3d72-56f6-4e62-b627-9a181a820f6a-c000.csv.snappy

当它应该仅为montfh-04.csv

代码:

val processedMetadataDf = spark.read.csv("s3://" + metadataPath + "/PROCESSED/" + "month-04" + ".csv")
val processCount = processedMetadataDf.count()
if (processCount == 0) {
// Initial frame is 0B -> Overwrite with path 
val newDat = Seq("dummy-row-data")
val unknown_df = newDat.toDF()
unknown_df.write.mode("overwrite").option("header","false").csv("s3://" + metadataPath + "/PROCESSED/" + "montfh-04" + ".csv")

}

在这里,我注意到两件奇怪的事情:

  • 它把它放在一个目录中
  • 它通过快速压缩将奇怪的部分子路径添加到文件中

我所要做的只是简单地将一个具有该名称的平面CSV文件写入指定的路径。我有什么选择?

这就是spark的工作原理。您为保存DataSet/DataFrame提供的位置是spark可以写入其所有分区的目录位置。零件文件的数量将等于分区的数量,在您的情况下,分区的数量仅为1。

现在,如果你只想让文件名为montfh-04.csv,那么你可以重命名它。

注意:S3中的重命名操作成本高昂(复制和删除(。当您使用spark编写时,它将是I/O的3倍,2倍是输出Commit操作,1倍是重命名。最好将它写在HDFS中,然后用所需的密钥名从那里上传

最新更新