我正在尝试这个:
df=dfFromJson:
{"class":"name 1","stream":"science"}
{"class":"name 1","stream":"arts"}
{"class":"name 1","stream":"science"}
{"class":"name 1","stream":"law"}
{"class":"name 1","stream":"law"}
{"class":"name 2","stream":"science"}
{"class":"name 2","stream":"arts"}
{"class":"name 2","stream":"law"}
{"class":"name 2","stream":"science"}
{"class":"name 2","stream":"arts"}
{"class":"name 2","stream":"law"}
df.groupBy("class").agg(count(col("stream")==="science") as "stream_science", count(col("stream")==="arts") as "stream_arts", count(col("stream")==="law") as "stream_law")
这并没有给出预期的输出,我如何才能以最快的方式实现它?
这是不完全清楚什么是预期的输出,但我猜你想要这样的东西:
import org.apache.spark.sql.functions.{count, col, when}
val streams = df.select($"stream").distinct.collect.map(_.getString(0))
val exprs = streams.map(s => count(when($"stream" === s, 1)).alias(s"stream_$s"))
df
.groupBy("class")
.agg(exprs.head, exprs.tail: _*)
// +------+--------------+----------+-----------+
// | class|stream_science|stream_law|stream_arts|
// +------+--------------+----------+-----------+
// |name 1| 2| 2| 1|
// |name 2| 2| 2| 2|
// +------+--------------+----------+-----------+
如果你不关心名字并且只有一个组列,你可以简单地使用DataFrameStatFunctions.crosstab
:
df.stat.crosstab("class", "stream")
// +------------+---+----+-------+
// |class_stream|law|arts|science|
// +------------+---+----+-------+
// | name 1| 2| 1| 2|
// | name 2| 2| 2| 2|
// +------------+---+----+-------+
您可以按两个列分组,而不是按单个列分组然后过滤。由于我对Scala不够熟练,下面是用Python编写的代码片段。注意,我已经将你的colname从"stream"one_answers"class"更改为" deep "one_answers"name",以避免与Spark的"stream"one_answers"class"类型的名称冲突。
import pyspark.sql
from pyspark.sql import Row
hc = HiveContext(sc)
obj = [
{"class":"name 1","stream":"science"},
{"class":"name 1","stream":"arts"}
{"class":"name 1","stream":"science"},
{"class":"name 1","stream":"law"},
{"class":"name 1","stream":"law"},
{"class":"name 2","stream":"science"},
{"class":"name 2","stream":"arts"},
{"class":"name 2","stream":"law"},
{"class":"name 2","stream":"science"},
{"class":"name 2","stream":"arts"},
{"class":"name 2","stream":"law"}
]
rdd = sc.parallelize(obj).map(labmda i: Row(dept=i['stream'], name=i['class']))
df = hc.createDataFrame(rdd)
df.groupby(df.dept, df.name).count().collect()
这会产生以下输出-
[
Row(dept='science', name='name 1', count=2),
Row(dept='science', name='name 2', count=2),
Row(dept='arts', name='name 1', count=1),
Row(dept='arts', name='name 2', count=2),
Row(dept='law', name='name 1', count=2),
Row(dept='law', name='name 2', count=2)
]