问题:我有一个数据集a{filed1,field2,field3…},我想先用field1
对a进行分组,然后在每个结果组中,我想进行一堆子查询,例如,计算具有field2 == true
的行数,或计算具有field4 == "some_value"
和field5 == false
的不同field3
的数量,等等。
我能想到的一些替代方案:我可以编写一个自定义的用户定义聚合函数,该函数接受一个计算过滤条件的函数,但这样我必须为每个查询条件创建一个实例。我还研究了countDistinct
函数可以实现一些操作,但我不知道如何使用它来实现过滤器的不同计数语义。
在Pig中,我可以做到:
FOREACH (GROUP A by field1) {
field_a = FILTER A by field2 == TRUE;
field_b = FILTER A by field4 == 'some_value' AND field5 == FALSE;
field_c = DISTINCT field_b.field3;
GENERATE FLATTEN(group),
COUNT(field_a) as fa,
COUNT(field_b) as fb,
COUNT(field_c) as fc,
有没有一种方法可以在Spark SQL中做到这一点?
排除不同计数,这可以通过简单的求和条件来解决:
import org.apache.spark.sql.functions.sum
val df = sc.parallelize(Seq(
(1L, true, "x", "foo", true), (1L, true, "y", "bar", false),
(1L, true, "z", "foo", true), (2L, false, "y", "bar", false),
(2L, true, "x", "foo", false)
)).toDF("field1", "field2", "field3", "field4", "field5")
val left = df.groupBy($"field1").agg(
sum($"field2".cast("int")).alias("fa"),
sum(($"field4" === "foo" && ! $"field5").cast("int")).alias("fb")
)
left.show
// +------+---+---+
// |field1| fa| fb|
// +------+---+---+
// | 1| 3| 0|
// | 2| 1| 1|
// +------+---+---+
不幸的是,这要棘手得多。Spark SQL中的GROUP BY
子句不会对数据进行物理分组。更不用说寻找不同的元素是相当昂贵的。也许你能做的最好的事情是分别计算不同的计数,并简单地将结果相加:
val right = df.where($"field4" === "foo" && ! $"field5")
.select($"field1".alias("field1_"), $"field3")
.distinct
.groupBy($"field1_")
.agg(count("*").alias("fc"))
val joined = left
.join(right, $"field1" === $"field1_", "leftouter")
.na.fill(0)
使用UDAF来计算每个条件的不同值肯定是一种选择,但有效的实现将相当棘手。从内部表示转换相当昂贵,并且使用集合存储实现快速UDAF也不便宜。如果你能接受近似的解决方案,你可以在那里使用bloom过滤器。