用Spark为每个分区创建一个CSV



我有一个大约10GB的数据帧,应该写入一堆CSV文件,每个分区一个。

csv应按3个字段划分:"system", "date_month"one_answers"customer".

在每个文件夹中只应写入一个CSV文件,并且CSV文件中的数据应按另外两个字段排序:&;date_day&;和"date_hour".

文件系统(一个S3桶)应该是这样的:

/system=foo/date_month=2022-04/customer=CU000001/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000002/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000003/part-00000-x.c000.csv
/system=foo/date_month=2022-04/customer=CU000004/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000001/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000002/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000003/part-00000-x.c000.csv
/system=foo/date_month=2022-05/customer=CU000004/part-00000-x.c000.csv

我知道我可以很容易地实现使用coalesce(1),但这将只使用一个工人,我想避免这种情况。

我试过这个策略

mydataframe.
repartition($"system", $"date_month", $"customer").
sort("date_day", "date_hour").
write.
partitionBy("system", "date_month", "customer").
option("header", "false").
option("sep", "t").
format("csv").
save(s"s3://bucket/spool/")

我的想法是,每个worker将获得一个不同的分区,因此它可以轻松地对数据进行排序并在分区路径中写入单个文件。运行代码后,我注意到每个分区都有许多CSV,如下所示:

/system=foo/date_month=2022-05/customer=CU000001/part-00000-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00001-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00002-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00003-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00004-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00005-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00006-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv                                                                                                                                                                                           
/system=foo/date_month=2022-05/customer=CU000001/part-00007-df027d9e-3d57-492b-b97a-daa5e80fdc93.c000.csv      
[...]                                                                                                                                                                                     

每个文件中的数据按预期顺序排列,所有文件的连接将创建正确的文件,但这需要太多时间,我更喜欢依赖Spark。

是否有一种方法可以为每个分区创建一个有序的CSV文件,而不需要将所有数据移动到使用coalesce(1)的单个worker中?

我正在使用scala,如果这很重要的话。

sort()(以及orderBy())触发shuffle,因为它对整个数据框进行排序,要在分区内进行排序,您应该使用适当命名的sortWithinPartitions

mydataframe.
repartition($"system", $"date_month", $"customer").
sortWithinPartitions("date_day", "date_hour").
write.
partitionBy("system", "date_month", "customer").
option("header", "false").
option("sep", "t").
format("csv").
save(s"s3://bucket/spool/")

最新更新