将spark dataFrame作为单个CSV文件(无文件夹)写入S3



我的要求很简单,我需要将我的spark DataFrame作为指定名称的单个csv文件写入S3现在写,我使用的是.confluence(1(,它将所有数据放在一个CSV中,但仍然创建了一个包含一些附加文件的文件夹,并且主CSV文件的名称是一些id。[我使用的是java/scala]

dataFrame.coalesce(1).write.mode(SaveMode.Overwrite).option("header", "true").csv("s3a://<mypath>")

就是这样保存数据的

我认为您可以收集记录并通过驱动程序保存,因为您正在合并1,所以在任何情况下都需要将所有记录传输到一个节点。

但在将其收集到本地之前,我认为最好将数据帧转换为数据集。

只需做一些类似的事情:

dataframe.as[TypeToDefine].collect()

然后你得到了一个TypeToDefine数组,你可以用你想要的名称使用任何流行的java/scala-csv库来编写你的csv。

我想这就是您的要求。

import org.apache.hadoop.fs.Path
val s3Path: String = ???      // full S3 path & file name
val textToWrite: String = ??? // collect your dataframe and convert to a single String
val path = new Path(s3Path)
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
val out = fs.create(path, true)
out.write( textToWrite.getBytes )
out.close()

最新更新