spark scala列到列唯一值的计数器



如何正确地将列值获取为Map(k->v(,其中k是唯一值,v是出现次数?我在分组内完成。

val getMapUDF = udf((arr: Array[Long]) => {arr.groupBy(identity).map{ case (x,y) => x -> y.size}})

df
.withWatermark("time", "30 seconds")
.groupBy(window(col("time"), "1 minutes").alias("someTime"), col("foo"), col("bar"))
.agg(count("*").alias("rowCount"), collect_list(col("aaa")).alias("aaaList"))
.withColumn("qtypes", getMapUDF(col("foobar")))

编辑:输入

+-----------+-------------------+
| foo | bar | foobar            |
+-----------+-------------------+
| aaa | a   | [1,1,1,2,3,3]     |
| bbb | b   | [1,2,3,1,2]       |
+-----------+-------------------+

预期输出

+-----------+--------------------+
| foo | bar | foobarMap          |
+-----------+--------------------+
| aaa | a   | [1->3, 2->1, 3->2] |
| bbb | b   | [1->2, 2->2, 3->1] |
+-----------+--------------------+

Q: 我可以使用map_from_arrays吗?

考虑到Array arr ,这就是您想要的

val arr: Array[Long] = Array(1,1,1,2,3,3)
arr.groupBy(identity).mapValues(_.size)

因此,如果您想用Spark SQL API/Column转换来替换UDF,这可能就是您想要的

val data = Seq(
("aaa","a",Array(1,1,1,2,3,3)),
("bbb","b",Array(1,2,3,1,2))
)

val df = spark.createDataset(data).toDF("foo", "bar", "foobar")
val res = df.select($"foo",explode_outer($"foobar"))
.groupBy("foo","col").count()
.withColumn("mapped",map($"col",$"count"))
.groupBy("foo")
.agg(collect_list("mapped"))
res.show(false)

所以你会收到这个

+---+------------------------------+
|foo|collect_list(mapped)          |
+---+------------------------------+
|aaa|[[3 -> 2], [1 -> 3], [2 -> 1]]|
|bbb|[[2 -> 2], [1 -> 2], [3 -> 1]]|
+---+------------------------------+

希望这能以某种方式帮助你

我认为可以做一些事情来代替collect_list,这样你就可以在不做2groupBy的情况下得到你想要的东西。我假设您的输入数据看起来像下面的df

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df.show
+---+---+---+
|foo|bar|aaa|
+---+---+---+
|aaa|  a|  1|
|aaa|  a|  1|
|aaa|  a|  1|
|aaa|  a|  2|
|aaa|  a|  3|
|aaa|  a|  3|
|bbb|  b|  1|
|bbb|  b|  2|
|bbb|  b|  3|
|bbb|  b|  1|
|bbb|  b|  2|
+---+---+---+
val df2 = df.withColumn(
"foobarmap",
struct(
$"aaa",
count("aaa").over(Window.partitionBy("foo", "bar", "aaa"))
)
).groupBy(
"foo", "bar"
).agg(
count("*").alias("rowcount"), 
map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")
df2.show(2,0)
+---+---+--------+------------------------+
|foo|bar|rowcount|foobarmap               |
+---+---+--------+------------------------+
|aaa|a  |6       |[2 -> 1, 3 -> 2, 1 -> 3]|
|bbb|b  |5       |[2 -> 2, 3 -> 1, 1 -> 2]|
+---+---+--------+------------------------+

要添加水印并按窗口分组,可以实现以下代码:

val df2 = df.withWatermark(
"time", "30 seconds"
).withColumn(
"foobarmap",
struct(
$"aaa",
count("aaa").over(Window.partitionBy(window(col("time"), "1 minutes"), "foo", "bar", "aaa"))
).alias("foobarmap")
).groupBy(
window(col("time"), "1 minutes"), "foo", "bar"
).agg(
count("*").alias("rowcount"), 
map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")

最新更新