Spark SQL二次筛选和分组



问题:我有一个数据集a{filed1,field2,field3…},我想先用field1对a进行分组,然后在每个结果组中,我想进行一堆子查询,例如,计算具有field2 == true的行数,或计算具有field4 == "some_value"field5 == false的不同field3的数量,等等。

我能想到的一些替代方案:我可以编写一个自定义的用户定义聚合函数,该函数接受一个计算过滤条件的函数,但这样我必须为每个查询条件创建一个实例。我还研究了countDistinct函数可以实现一些操作,但我不知道如何使用它来实现过滤器的不同计数语义。

在Pig中,我可以做到:

FOREACH (GROUP A by field1) {
        field_a = FILTER A by field2 == TRUE;
        field_b = FILTER A by field4 == 'some_value' AND field5 == FALSE;
        field_c = DISTINCT field_b.field3;
        GENERATE  FLATTEN(group),
                  COUNT(field_a) as fa,
                  COUNT(field_b) as fb,
                  COUNT(field_c) as fc,

有没有一种方法可以在Spark SQL中做到这一点?

排除不同计数,这可以通过简单的求和条件来解决:

import org.apache.spark.sql.functions.sum
val df = sc.parallelize(Seq(
  (1L, true, "x", "foo", true), (1L, true, "y", "bar", false), 
  (1L, true, "z", "foo", true), (2L, false, "y", "bar", false), 
  (2L, true, "x", "foo", false)
)).toDF("field1", "field2", "field3", "field4", "field5")
val left = df.groupBy($"field1").agg(
  sum($"field2".cast("int")).alias("fa"),
  sum(($"field4" === "foo" && ! $"field5").cast("int")).alias("fb")
)
left.show
// +------+---+---+
// |field1| fa| fb|
// +------+---+---+
// |     1|  3|  0|
// |     2|  1|  1|
// +------+---+---+

不幸的是,这要棘手得多。Spark SQL中的GROUP BY子句不会对数据进行物理分组。更不用说寻找不同的元素是相当昂贵的。也许你能做的最好的事情是分别计算不同的计数,并简单地将结果相加:

val right = df.where($"field4" === "foo" && ! $"field5")
  .select($"field1".alias("field1_"), $"field3")
  .distinct
  .groupBy($"field1_")
  .agg(count("*").alias("fc"))
val joined = left
  .join(right, $"field1" === $"field1_", "leftouter")
  .na.fill(0)

使用UDAF来计算每个条件的不同值肯定是一种选择,但有效的实现将相当棘手。从内部表示转换相当昂贵,并且使用集合存储实现快速UDAF也不便宜。如果你能接受近似的解决方案,你可以在那里使用bloom过滤器。

最新更新