在数据框架上任意数量的过滤器数



我有许多过滤器需要应用于Spark中的数据框架,但是它首先是在运行时,我知道对用户的过滤器。目前,我将它们添加到单个filter函数中,但是如果未定义的一个filtes之一

,那将失败
myDataFrame
    .filter(_filter1)
    .filter(_filter2)
    .filter(_filter3)...

如果不需要的话,我真的找不到在运行时的动态如何排除FX _filter2

我应该通过创建一个大过滤器来做到这一点:

var filter = _filter1
if (_filter2 != null)
    filter = filter.and(_filter2)
...

或者在我找不到的火花中有很好的模式?

一种可能的解决方案是默认所有 filters to lit(true)

import org.apache.spark.sql.functions._
val df = Seq(1, 2, 3).toDF("x")
val filter_1 = lit(true)
val filter_2 = col("x") > 1
val filter_3 = lit(true)
val filtered = df.filter(filter_1).filter(filter_2).filter(filter_3)

这将使 null脱离您的代码,而在执行计划中将详细介绍真实的谓词:

filtered.explain
== Physical Plan ==
*Project [value#1 AS x#3]
+- *Filter (value#1 > 1)
   +- LocalTableScan [value#1]

您当然可以使它变得更简单,并且可以使一系列谓词:

import org.apache.spark.sql.Column
val preds: Seq[Column] = Seq(lit(true), col("x") > 1, lit(true))
df.where(preds.foldLeft(lit(true))(_ and _))

,如果正确实施,请完全跳过占位符。

一开始我会摆脱零过滤器:

val filters:List[A => Boolean] = nullableFilters.filter(_!=null)

然后将功能定义为链过滤器:

def chainFilters[A](filters:List[A => Boolean])(v:A) = filters.forall(f => f(v))

现在您可以简单地将过滤器应用于DF:

df.filter(chainFilters(nullableFilters.filter(_!=null))

为什么不:

var df = // load
if (_filter2 != null) {
    df = df.filter(_filter2)
}
etc

另外,创建过滤器列表:

var df = // load
val filters = Seq (filter1, filter2, filter3, ...)
filters.filter(_ != null).foreach (x => df = df.filter(x))

//对不起,如果代码有一些错误,这是一个想法 - 目前我无法测试代码

相关内容

  • 没有找到相关文章

最新更新