我有很多DataFrames,每个DataFrames都可以有单独的过滤器来过滤数据。过滤器也是预定义的。我计划创建一个组合数据帧,它将包含作为列之一的筛选表达式。在这个组合的数据帧中,我需要应用过滤器,它是数据行本身的一部分。例如
如果我有3个像这样的数据帧
val ausDF = Seq(
("australia", "Steve Smith", "batter"),
("australia", "David Warner", "batter"),
("australia", "Pat Cummins", "bowler")
).toDF("country", "player", "speciality")
val indDF = Seq(
("india", "Rohit Sharma", "batsman"),
("india", "Virat Kohli", "batsman"),
("india", "Jaspreet Bumrah", "bowler")
).toDF("country", "player", "speciality")
val engDF = Seq(
("england", "Jos Buttler", "bat"),
("england", "Joe Root", "bat"),
("england", "James Anderson", "bowl")
).toDF("country", "player", "speciality")
我可以做一个联合来创建一个像这样的组合数据帧
val cricketersDF = ausDF.union(indDF).union(engDF)
如果有像这样的过滤器数据帧
val batsmanFilter = Seq(
("australia", "speciality == "batter""),
("india", "speciality == "batsman""),
("england", "speciality == "bat"")
).toDF("country", "filter")
然后我可以加入这2个DataFrames
val batsmanFilterDF = cricketersDF.join(batsmanFilter, "country")
它给了我一个带有像这样的过滤器的数据帧
+---------+---------------+----------+-----------------------+
|country |player |speciality|filter |
+---------+---------------+----------+-----------------------+
|australia|Steve Smith |batter |speciality == "batter" |
|australia|David Warner |batter |speciality == "batter" |
|australia|Pat Cummins |bowler |speciality == "batter" |
|india |Rohit Sharma |batsman |speciality == "batsman"|
|india |Virat Kohli |batsman |speciality == "batsman"|
|india |Jaspreet Bumrah|bowler |speciality == "batsman"|
|england |Jos Buttler |bat |speciality == "bat" |
|england |Joe Root |bat |speciality == "bat" |
|england |James Anderson |bowl |speciality == "bat" |
+---------+---------------+----------+-----------------------+
现在,我想要的是应用filter列中提供的过滤器来获得所需的结果。类似于这个的东西
batsmanFilterDF.filter(col("filter"))
然而,这给了我一个错误,
Exception in thread "main" org.apache.spark.sql.AnalysisException: filter expression '`filter`' of type string is not a boolean.;;
Filter filter#45: string
所以,我想知道有没有一种方法可以使用dataframe列中指定的值来使用基于条件表达式的过滤?
AFAIK,无法将列中包含的复杂筛选器应用于数据帧。如果过滤器很简单,我们可以设计一个技巧,但你似乎说过滤器可能很复杂。
如果数据帧batsmanFilter
很小,则可以从驱动程序设计并应用筛选器。它会是这样的:
val filter = batsmanFilter
.collect
.map(row => (row.getAs[String]("country"), row.getAs[String]("filter")))
.map{ case (country, filter) =>
"((country == "" + country + "") and (" + filter + "))"
}
.reduce(_ + " or " + _)
cricketersDF.where(filter).show
它产生了你所期望的:
+---------+------------+----------+
| country| player|speciality|
+---------+------------+----------+
|australia| Steve Smith| batter|
|australia|David Warner| batter|
| india|Rohit Sharma| batsman|
| india| Virat Kohli| batsman|
| england| Jos Buttler| bat|
| england| Joe Root| bat|
+---------+------------+----------+
这种方法的优点是只应用一个滤波器。然而,只有当batsmanFilter
数据帧相当小时,这才会起作用。如果不是这样,我们也可以解决一些问题,但我们需要更多地了解我们能找到的过滤器类型。