在scala中写入数据框架分区到自定义目录



我有一个包含数百万条记录的数据框架,需要使用glue作业将数据分区到s3桶文件夹中,小于200MB或200,000行。使用partitionBy将不起作用,因为没有列值可以分割数据,从而使分区的大小低于下游进程所需的大小。我尝试添加单调递增id和基于id的预定义范围写入,但这不会工作,因为单调递增_id不是连续的。如何将分区数据写入小于200mb的s3文件夹中,或者是否有办法通过重新分区的数据框架进行分区

val newdf = diffDF.withColumn("id", monotonically_increasing_id())                    
var batchSize = 100000
var totalRecordCount = diffDF.count()
var currentRow = 0        
while(currentRow < totalRecordCount){
var segmentDF = newdf.where(col("id") >= currentRow and col("id") < (currentRow + batchSize ))
.drop("id")
segmentDF.write.option("header","true").mode(SaveMode.Overwrite).csv(tpath + "/" + currentRow)
currentRow = currentRow + batchSize
}  

这是一个Scala-ish解决方案,它使用折叠,我尝试将相同的逻辑适应到spark中,而spark rdd现在拥有的最类似的东西是rdd.aggregate,其参数列表中的combineOp只是破坏了一切!因此,如果您对使用RDDs感到满意,那么这种方法或spark中的类似方法将为您工作:

val rdd = df.rdd
rdd.collect().foldLeft(List.empty[List[Row]]) {
case (l@(headAggregator :: tail), newRow) =>
// this if represents rdd size, so instead of list.length you can capture rdd size
if ((newRow :: headAggregator).length < 3) (newRow :: headAggregator)  :: tail
else (newRow :: Nil) :: l
case (Nil, newRow) =>
(newRow :: Nil) :: Nil
}

我知道,这个rdd.collect()实际上非常昂贵,但我只是实现了逻辑,所以如果你发现类似于RDDs的foldLeft,只需复制并粘贴函数体:)

我最后做的是添加一列,这是id值的除法的余数。

val diffDF = .withColumn("partitionnum", col("Employee_ID") % 9) .write.option("header","true").partitionBy("partitionnum").mode(SaveMode.Overwrite).csv(tpath)

这将提供9个分区,并且是高度可定制的。对于5个分区,可以除以5,等等

相关内容

  • 没有找到相关文章

最新更新