我有一个包含数百万条记录的数据框架,需要使用glue作业将数据分区到s3桶文件夹中,小于200MB或200,000行。使用partitionBy将不起作用,因为没有列值可以分割数据,从而使分区的大小低于下游进程所需的大小。我尝试添加单调递增id和基于id的预定义范围写入,但这不会工作,因为单调递增_id不是连续的。如何将分区数据写入小于200mb的s3文件夹中,或者是否有办法通过重新分区的数据框架进行分区
val newdf = diffDF.withColumn("id", monotonically_increasing_id())
var batchSize = 100000
var totalRecordCount = diffDF.count()
var currentRow = 0
while(currentRow < totalRecordCount){
var segmentDF = newdf.where(col("id") >= currentRow and col("id") < (currentRow + batchSize ))
.drop("id")
segmentDF.write.option("header","true").mode(SaveMode.Overwrite).csv(tpath + "/" + currentRow)
currentRow = currentRow + batchSize
}
这是一个Scala-ish解决方案,它使用折叠,我尝试将相同的逻辑适应到spark中,而spark rdd现在拥有的最类似的东西是rdd.aggregate
,其参数列表中的combineOp只是破坏了一切!因此,如果您对使用RDD
s感到满意,那么这种方法或spark中的类似方法将为您工作:
val rdd = df.rdd
rdd.collect().foldLeft(List.empty[List[Row]]) {
case (l@(headAggregator :: tail), newRow) =>
// this if represents rdd size, so instead of list.length you can capture rdd size
if ((newRow :: headAggregator).length < 3) (newRow :: headAggregator) :: tail
else (newRow :: Nil) :: l
case (Nil, newRow) =>
(newRow :: Nil) :: Nil
}
我知道,这个rdd.collect()
实际上非常昂贵,但我只是实现了逻辑,所以如果你发现类似于RDD
s的foldLeft,只需复制并粘贴函数体:)
我最后做的是添加一列,这是id值的除法的余数。
val diffDF = .withColumn("partitionnum", col("Employee_ID") % 9) .write.option("header","true").partitionBy("partitionnum").mode(SaveMode.Overwrite).csv(tpath)
这将提供9个分区,并且是高度可定制的。对于5个分区,可以除以5,等等