我正在基于groupBy
条件进行聚合,并在我现有的Spark/Scala DataFrame上应用一些过滤器。但是,在执行我的代码时,"无法解析"给定的输入列"flag
":"">
val someDF = Seq(
(1, 111,100,100,"C","5th","Y",11),
(1, 111,100,100,"C","5th","Y",11),
(2, 222,200,200,"C","5th","Y",22),
(2, 222,200,200,"C","5th","Y",22)
).toDF("id","rollno","sub1","sub2","flag","class","status","sno")
var df2 = someDF.groupBy("id","rollno")
.agg(sum("sub1").alias("sub1"),sum("sub2").alias("sub2"))
.filter(col("flag") === "C")
.filter(length(col("rollno")) >= 2)
.filter(col("class") === ("5th") || col("class") === ("6th"))
.filter(substring(col("rollno"), 1, 2) === col("sno"))
.filter(col("status") === "Y")
.select("id", "rollno", "sub1", "sub2", "flag", "class", "sno", "status")
错误:
org.apache.spark.sql.AnalysisException:无法解析给定输入列的"
flag
":[id,rollno,sub1,sub2];;'过滤器('flag=C)
预期结果:
+---+------+----+----+----+-----+------+---+
| id|rollno|sub1|sub2|flag|class|status|sno|
+---+------+----+----+----+-----+------+---+
| 1| 111| 200| 200| C| 5th| Y| 11|
| 2| 222| 400| 400| C| 5th| Y| 22|
+---+------+----+----+----+-----+------+---+
聚合后,其他列已经消失,因此无法根据这些列进行筛选。您需要先进行筛选,然后再按进行分组。如果要保留其他列,还需要按它们进行分组。
var df2 = someDF
.filter(col("flag") === "C")
.filter(length(col("rollno")) >= 2)
.filter(col("class") === ("5th") || col("class") === ("6th"))
.filter(substring(col("rollno"), 1, 2) === col("sno"))
.filter(col("status") === "Y")
.groupBy("id", "rollno", "flag", "class", "sno", "status")
.agg(sum("sub1").alias("sub1"),sum("sub2").alias("sub2"))
.select("id", "rollno", "sub1", "sub2", "flag", "class", "sno", "status")
df2.show
+---+------+----+----+----+-----+---+------+
| id|rollno|sub1|sub2|flag|class|sno|status|
+---+------+----+----+----+-----+---+------+
| 1| 111| 200| 200| C| 5th| 11| Y|
| 2| 222| 400| 400| C| 5th| 22| Y|
+---+------+----+----+----+-----+---+------+