通常,组中的所有行都会传递给聚合函数。我想使用一个条件来过滤行,这样一个组中只有一些行被传递给聚合函数。这样的操作在PostgreSQL中是可能的。我想对Spark SQL DataFrame(Spark 2.0.0(做同样的事情
代码可能看起来像这样:
val df = ... // some data frame
df.groupBy("A").agg(
max("B").where("B").less(10), // there is no such method as `where` :(
max("C").where("C").less(5)
)
对于这样的数据帧:
| A | B | C |
| 1| 14| 4|
| 1| 9| 3|
| 2| 5| 6|
结果是:
|A|max(B)|max(C)|
|1| 9| 4|
|2| 5| null|
Spark SQL有可能吗?
注意,通常可以使用除max
之外的任何其他聚合函数,并且可以在具有任意过滤条件的同一列上存在多个聚合。
val df = Seq(
(1,14,4),
(1,9,3),
(2,5,6)
).toDF("a","b","c")
val aggregatedDF = df.groupBy("a")
.agg(
max(when($"b" < 10, $"b")).as("MaxB"),
max(when($"c" < 5, $"c")).as("MaxC")
)
aggregatedDF.show
>>> df = sc.parallelize([[1,14,1],[1,9,3],[2,5,6]]).map(lambda t: Row(a=int(t[0]),b=int(t[1]),c=int(t[2]))).toDF()
>>> df.registerTempTable('t')
>>> res = sqlContext.sql("select a,max(case when b<10 then b else null end) mb,max(case when c<5 then c else null end) mc from t group by a")
+---+---+----+
| a| mb| mc|
+---+---+----+
| 1| 9| 3|
| 2| 5|null|
+---+---+----+
你可以使用sql(我相信你在Postgres中也做了同样的事情?(
df.groupBy("name","age","id").agg(functions.max("age").$less(20),functions.max("id").$less("30")).show();
样本数据:
name age id
abc 23 1001
cde 24 1002
efg 22 1003
ghi 21 1004
ijk 20 1005
klm 19 1006
mno 18 1007
pqr 18 1008
rst 26 1009
tuv 27 1010
pqr 18 1012
rst 28 1013
tuv 29 1011
abc 24 1015
输出:
+----+---+----+---------------+--------------+
|name|age| id|(max(age) < 20)|(max(id) < 30)|
+----+---+----+---------------+--------------+
| rst| 26|1009| false| true|
| abc| 23|1001| false| true|
| ijk| 20|1005| false| true|
| tuv| 29|1011| false| true|
| efg| 22|1003| false| true|
| mno| 18|1007| true| true|
| tuv| 27|1010| false| true|
| klm| 19|1006| true| true|
| cde| 24|1002| false| true|
| pqr| 18|1008| true| true|
| abc| 24|1015| false| true|
| ghi| 21|1004| false| true|
| rst| 28|1013| false| true|
| pqr| 18|1012| true| true|
+----+---+----+---------------+--------------+