根据条件拆分Spark DataFrame



我需要类似于randomSplit函数的东西:

val Array(df1, df2) = myDataFrame.randomSplit(Array(0.6, 0.4))

但是,我需要根据布尔条件拆分myDataFrame。是否存在类似以下内容的内容?

val Array(df1, df2) = myDataFrame.booleanSplit(col("myColumn") > 100)

我不想做两个单独的筛选调用。

不幸的是,DataFrame API没有这样的方法,要按条件拆分,您必须执行两个单独的filter转换:

myDataFrame.cache() // recommended to prevent repeating the calculation
val condition = col("myColumn") > 100
val df1 = myDataFrame.filter(condition)
val df2 = myDataFrame.filter(not(condition))

我知道缓存和过滤两次看起来有点难看,但请记住,DataFrames被转换为RDD,它们的评估是惰性的,即只有在操作中直接或间接使用它们时。

如果存在问题中建议的方法booleanSplit,则结果将被转换为两个RDD,每个RDD都将被延迟评估。两个RDD中的一个将首先进行评估,另一个将严格在第一个之后进行第二次评估。在评估第一个RDD时,第二个RDD还没有"出现"(编辑:刚刚注意到API RDD有一个类似的问题,答案给出了类似的推理)

为了实际获得任何性能优势,第二个RDD必须在第一个RDD的迭代期间(或者,实际上,在两者的父RDD迭代期间,由第一个RDD的迭代触发)(部分)保持。IMO这与RDD API的其余部分的设计不太一致。不确定性能提升是否能证明这一点。

我认为,最好的方法是避免在业务代码中直接编写两个筛选器调用,方法是使用方法booleanSplit编写一个隐式类作为实用程序。方法以与Tzach Zohar的答案类似的方式完成这一部分,可能使用类似于myDataFrame.withColumn("__condition_value", condition).cache()的方法,这样条件的值就不会计算两次。

相关内容

  • 没有找到相关文章

最新更新