我有许多过滤器需要应用于Spark中的数据框架,但是它首先是在运行时,我知道对用户的过滤器。目前,我将它们添加到单个filter
函数中,但是如果未定义的一个filtes之一
myDataFrame
.filter(_filter1)
.filter(_filter2)
.filter(_filter3)...
如果不需要的话,我真的找不到在运行时的动态如何排除FX _filter2
?
我应该通过创建一个大过滤器来做到这一点:
var filter = _filter1
if (_filter2 != null)
filter = filter.and(_filter2)
...
或者在我找不到的火花中有很好的模式?
一种可能的解决方案是默认所有 filters
to lit(true)
:
import org.apache.spark.sql.functions._
val df = Seq(1, 2, 3).toDF("x")
val filter_1 = lit(true)
val filter_2 = col("x") > 1
val filter_3 = lit(true)
val filtered = df.filter(filter_1).filter(filter_2).filter(filter_3)
这将使 null
脱离您的代码,而在执行计划中将详细介绍真实的谓词:
filtered.explain
== Physical Plan ==
*Project [value#1 AS x#3]
+- *Filter (value#1 > 1)
+- LocalTableScan [value#1]
您当然可以使它变得更简单,并且可以使一系列谓词:
import org.apache.spark.sql.Column
val preds: Seq[Column] = Seq(lit(true), col("x") > 1, lit(true))
df.where(preds.foldLeft(lit(true))(_ and _))
,如果正确实施,请完全跳过占位符。
一开始我会摆脱零过滤器:
val filters:List[A => Boolean] = nullableFilters.filter(_!=null)
然后将功能定义为链过滤器:
def chainFilters[A](filters:List[A => Boolean])(v:A) = filters.forall(f => f(v))
现在您可以简单地将过滤器应用于DF:
df.filter(chainFilters(nullableFilters.filter(_!=null))
为什么不:
var df = // load
if (_filter2 != null) {
df = df.filter(_filter2)
}
etc
另外,创建过滤器列表:
var df = // load
val filters = Seq (filter1, filter2, filter3, ...)
filters.filter(_ != null).foreach (x => df = df.filter(x))
//对不起,如果代码有一些错误,这是一个想法 - 目前我无法测试代码