这里有一些示例数据集和如何根据(列(状态="已删除"动态删除ID(无硬编码值(。
示例数据集:
ID Status Date Amount
1 New 01/05/20 20
1 Assigned 02/05/20 30
1 In-Progress 02/05/20 50
2 New 02/05/20 30
2 Removed 03/05/20 20
3 In-Progress 09/05/20 50
3 Removed 09/05/20 20
4 New 10/05/20 20
4 Assigned 10/05/20 30
预期结果:-
ID Status Date Amount
1 New 01/05/20 20
1 Assigned 02/05/20 30
1 In-Progress 02/05/20 50
4 New 10/05/20 20
4 Assigned 10/05/20 30
提前谢谢。
您可以使用filter
、not like/rlike
来filter out
数据帧中具有status
=removed
的记录。
import org.apche.spark.sql.functions._
//assuming df is the dataframe
//using filter or where clause, trim to remove white spaces lower to convert to lower
val df1=df.filter(lower(trim(col("status"))) !== "removed")
//or by filtering status Removed filter won't match if you have mixed case
val df1=df.filter(col("status") !== "Removed")
//using not like
val df1=df.filter(!lower(col("status")).like("removed"))
//using not rlike
val df1=df.filter(!col("status").rlike(".*(?i)removed.*"))
现在,df1数据帧中将包含所需的记录。
UPDATE:
From Spark2.4:
在这种情况下,我们可以使用join或window子句。
val df=Seq((1,"New","01/05/20","20"),(1,"Assigned","02/05/20","30"),(1,"In-Progress","02/05/20","50"),(2,"New","02/05/20","30"),(2,"Removed","03/05/20","20"),(3,"In-Progress","09/05/20","50"),(3,"Removed","09/05/20","20"),(4,"New","10/05/20","20"),(4,"Assigned","10/05/20","30")).toDF("ID","Status","Date","Amount")
import org.apache.spark.sql.expressions._
val df1=df.
groupBy("id").
agg(collect_list(lower(col("Status"))).alias("status_arr"))
//using array_contains function
df.alias("t1").join(df1.alias("t2"),Seq("id"),"inner").
filter(!array_contains(col("status_arr"),"removed")).
drop("status_arr").show()
//without join using window clause
val w=Window.partitionBy("id").orderBy("Status").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn("status_arr",collect_list(lower(col("status"))).over(w)).
filter(!array_contains(col("status_arr"),"removed")).
drop("status_arr").
show()
//+---+-----------+--------+------+
//| ID| Status| Date|Amount|
//+---+-----------+--------+------+
//| 1| New|01/05/20| 20|
//| 1| Assigned|02/05/20| 30|
//| 1|In-Progress|02/05/20| 50|
//| 4| New|10/05/20| 20|
//| 4| Assigned|10/05/20| 30|
//+---+-----------+--------+------+
For Spark < 2.4:
val df1=df.groupBy("id").agg(concat_ws("",collect_list(lower(col("Status")))).alias("status_arr"))
df.alias("t1").join(df1.alias("t2"),Seq("id"),"inner").
filter(!col("status_arr").contains("removed")).
drop(col("status_arr")).
show()
//Using window functions
df.withColumn("status_arr",concat_ws("",collect_list(lower(col("status"))).over(w))).
filter(!col("status_arr").contains("removed")).
drop(col("status_arr")).
show(false)
//+---+-----------+--------+------+
//| ID| Status| Date|Amount|
//+---+-----------+--------+------+
//| 1| New|01/05/20| 20|
//| 1| Assigned|02/05/20| 30|
//| 1|In-Progress|02/05/20| 50|
//| 4| New|10/05/20| 20|
//| 4| Assigned|10/05/20| 30|
//+---+-----------+--------+------+
假设res0
是你的数据集,你可以做:
import spark.implicits._
val x = res0.where($"Status" !== "Removed")
x.show()
这将删除状态为已删除的行,但不会根据您在上面发布的内容给出您想要实现的目标。