如何在条件表达式保存在dataframe列中的spark数据帧上应用条件表达式筛选器



我有很多DataFrames,每个DataFrames都可以有单独的过滤器来过滤数据。过滤器也是预定义的。我计划创建一个组合数据帧,它将包含作为列之一的筛选表达式。在这个组合的数据帧中,我需要应用过滤器,它是数据行本身的一部分。例如

如果我有3个像这样的数据帧

val ausDF = Seq(
("australia", "Steve Smith", "batter"),
("australia", "David Warner", "batter"),
("australia", "Pat Cummins", "bowler")
).toDF("country", "player", "speciality")
val indDF = Seq(
("india", "Rohit Sharma", "batsman"),
("india", "Virat Kohli", "batsman"),
("india", "Jaspreet Bumrah", "bowler")
).toDF("country", "player", "speciality")
val engDF = Seq(
("england", "Jos Buttler", "bat"),
("england", "Joe Root", "bat"),
("england", "James Anderson", "bowl")
).toDF("country", "player", "speciality")

我可以做一个联合来创建一个像这样的组合数据帧

val cricketersDF = ausDF.union(indDF).union(engDF)

如果有像这样的过滤器数据帧

val batsmanFilter = Seq(
("australia", "speciality == "batter""),
("india", "speciality == "batsman""),
("england", "speciality == "bat"")
).toDF("country", "filter")

然后我可以加入这2个DataFrames

val batsmanFilterDF = cricketersDF.join(batsmanFilter, "country")

它给了我一个带有像这样的过滤器的数据帧

+---------+---------------+----------+-----------------------+
|country  |player         |speciality|filter                 |
+---------+---------------+----------+-----------------------+
|australia|Steve Smith    |batter    |speciality == "batter" |
|australia|David Warner   |batter    |speciality == "batter" |
|australia|Pat Cummins    |bowler    |speciality == "batter" |
|india    |Rohit Sharma   |batsman   |speciality == "batsman"|
|india    |Virat Kohli    |batsman   |speciality == "batsman"|
|india    |Jaspreet Bumrah|bowler    |speciality == "batsman"|
|england  |Jos Buttler    |bat       |speciality == "bat"    |
|england  |Joe Root       |bat       |speciality == "bat"    |
|england  |James Anderson |bowl      |speciality == "bat"    |
+---------+---------------+----------+-----------------------+

现在,我想要的是应用filter列中提供的过滤器来获得所需的结果。类似于这个的东西

batsmanFilterDF.filter(col("filter"))

然而,这给了我一个错误,

Exception in thread "main" org.apache.spark.sql.AnalysisException: filter expression '`filter`' of type string is not a boolean.;;
Filter filter#45: string

所以,我想知道有没有一种方法可以使用dataframe列中指定的值来使用基于条件表达式的过滤?

AFAIK,无法将列中包含的复杂筛选器应用于数据帧。如果过滤器很简单,我们可以设计一个技巧,但你似乎说过滤器可能很复杂。

如果数据帧batsmanFilter很小,则可以从驱动程序设计并应用筛选器。它会是这样的:

val filter = batsmanFilter
.collect
.map(row => (row.getAs[String]("country"), row.getAs[String]("filter")))
.map{ case (country, filter) =>
"((country == "" + country + "") and (" + filter + "))"
}
.reduce(_ + " or " + _)
cricketersDF.where(filter).show

它产生了你所期望的:

+---------+------------+----------+
|  country|      player|speciality|
+---------+------------+----------+
|australia| Steve Smith|    batter|
|australia|David Warner|    batter|
|    india|Rohit Sharma|   batsman|
|    india| Virat Kohli|   batsman|
|  england| Jos Buttler|       bat|
|  england|    Joe Root|       bat|
+---------+------------+----------+

这种方法的优点是只应用一个滤波器。然而,只有当batsmanFilter数据帧相当小时,这才会起作用。如果不是这样,我们也可以解决一些问题,但我们需要更多地了解我们能找到的过滤器类型。