如何正确地将列值获取为Map(k->v(,其中k是唯一值,v是出现次数?我在分组内完成。
val getMapUDF = udf((arr: Array[Long]) => {arr.groupBy(identity).map{ case (x,y) => x -> y.size}})
df
.withWatermark("time", "30 seconds")
.groupBy(window(col("time"), "1 minutes").alias("someTime"), col("foo"), col("bar"))
.agg(count("*").alias("rowCount"), collect_list(col("aaa")).alias("aaaList"))
.withColumn("qtypes", getMapUDF(col("foobar")))
编辑:输入
+-----------+-------------------+
| foo | bar | foobar |
+-----------+-------------------+
| aaa | a | [1,1,1,2,3,3] |
| bbb | b | [1,2,3,1,2] |
+-----------+-------------------+
预期输出
+-----------+--------------------+
| foo | bar | foobarMap |
+-----------+--------------------+
| aaa | a | [1->3, 2->1, 3->2] |
| bbb | b | [1->2, 2->2, 3->1] |
+-----------+--------------------+
Q: 我可以使用map_from_arrays
吗?
考虑到Array arr ,这就是您想要的
val arr: Array[Long] = Array(1,1,1,2,3,3)
arr.groupBy(identity).mapValues(_.size)
因此,如果您想用Spark SQL API/Column转换来替换UDF,这可能就是您想要的
val data = Seq(
("aaa","a",Array(1,1,1,2,3,3)),
("bbb","b",Array(1,2,3,1,2))
)
val df = spark.createDataset(data).toDF("foo", "bar", "foobar")
val res = df.select($"foo",explode_outer($"foobar"))
.groupBy("foo","col").count()
.withColumn("mapped",map($"col",$"count"))
.groupBy("foo")
.agg(collect_list("mapped"))
res.show(false)
所以你会收到这个
+---+------------------------------+
|foo|collect_list(mapped) |
+---+------------------------------+
|aaa|[[3 -> 2], [1 -> 3], [2 -> 1]]|
|bbb|[[2 -> 2], [1 -> 2], [3 -> 1]]|
+---+------------------------------+
希望这能以某种方式帮助你
我认为可以做一些事情来代替collect_list
,这样你就可以在不做2groupBy
的情况下得到你想要的东西。我假设您的输入数据看起来像下面的df
。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df.show
+---+---+---+
|foo|bar|aaa|
+---+---+---+
|aaa| a| 1|
|aaa| a| 1|
|aaa| a| 1|
|aaa| a| 2|
|aaa| a| 3|
|aaa| a| 3|
|bbb| b| 1|
|bbb| b| 2|
|bbb| b| 3|
|bbb| b| 1|
|bbb| b| 2|
+---+---+---+
val df2 = df.withColumn(
"foobarmap",
struct(
$"aaa",
count("aaa").over(Window.partitionBy("foo", "bar", "aaa"))
)
).groupBy(
"foo", "bar"
).agg(
count("*").alias("rowcount"),
map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")
df2.show(2,0)
+---+---+--------+------------------------+
|foo|bar|rowcount|foobarmap |
+---+---+--------+------------------------+
|aaa|a |6 |[2 -> 1, 3 -> 2, 1 -> 3]|
|bbb|b |5 |[2 -> 2, 3 -> 1, 1 -> 2]|
+---+---+--------+------------------------+
要添加水印并按窗口分组,可以实现以下代码:
val df2 = df.withWatermark(
"time", "30 seconds"
).withColumn(
"foobarmap",
struct(
$"aaa",
count("aaa").over(Window.partitionBy(window(col("time"), "1 minutes"), "foo", "bar", "aaa"))
).alias("foobarmap")
).groupBy(
window(col("time"), "1 minutes"), "foo", "bar"
).agg(
count("*").alias("rowcount"),
map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")