嗨,我有我的 Spark 数据帧的输出,它可以创建文件夹结构并创建部分文件。现在我必须合并文件夹中的所有零件文件并将该文件重命名为文件夹路径名。
这就是我进行分区的方式
df.write.partitionBy("DataPartition","PartitionYear")
.format("csv")
.option("nullValue", "")
.option("header", "true")/
.option("codec", "gzip")
.save("hdfs:///user/zeppelin/FinancialLineItem/output")
它像这样创建文件夹结构
hdfs:///user/zeppelin/FinancialLineItem/output/DataPartition=Japan/PartitionYear=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz
hdfs:///user/zeppelin/FinancialLineItem/output/DataPartition=Japan/PartitionYear=1971/part-00002-87a61115-92c9-4926-a803-b46315e55a08.c001.csv.gz
我必须像这样创建最终文件
hdfs:///user/zeppelin/FinancialLineItem/output/Japan.1971.currenttime.csv.gz
这里没有零件文件bith 001和002合并为两个一。
我的数据大小非常大 300 GB gzip 和 35 GB 压缩,所以coalesce(1) and repartition
变得非常慢。
我在这里看到了一个解决方案使用 spark-csv 编写单个 CSV 文件,但我无法实现它,请帮助我。
重新分区抛出错误
error: value repartition is not a member of org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row]
dfMainOutputFinalWithoutNull.write.repartition("DataPartition","StatementTypeCode")
从 Spark 外部的头节点尝试此操作...
hdfs dfs -getmerge <src> <localdst>
https://hadoop.apache.org/docs/r1.2.1/file_system_shell.html#getmerge
"将源目录和目标文件作为输入,并将 src 中的文件连接到目标本地文件中。(可选)可以设置 addnl 以允许在每个文件的末尾添加换行符。