我们有一个常见的用例,按照行的创建顺序对表进行重复数据消除。
例如,我们有一个用户操作的事件日志。用户不时标记他最喜欢的类别。在我们的分析阶段,我们只想知道用户最后喜欢的类别。
示例数据:
id action_type value date
123 fav_category 1 2016-02-01
123 fav_category 4 2016-02-02
123 fav_category 8 2016-02-03
123 fav_category 2 2016-02-04
我们只想根据日期栏获得最新的更新。我们当然可以在sql:中完成
select * from (
select *, row_number() over (
partition by id,action_type order by date desc) as rnum from tbl
)
where rnum=1;
但是,在映射器端,它并不是部分聚合的,我们将把所有数据混洗到reducer中。
我发布了一个关于这个问题的Jira SPARK-17662,它以一个更好的SQL风格建议结束:
select id,
action_type,
max(struct(date, *)) last_record
from tbl
group by id,action_type
虽然这种解决方案要干净得多,但仍然存在两个问题:
- 如果其中一个字段不可排序(如map<>),则此技巧不起作用
- 如果稍后在流中只选择一些字段,我们将无法获得下推谓词来优化流,并从一开始就忽略不需要的字段
我们最终为此编写了一个UDAF,它克服了问题#1,但仍然存在问题#2。
有人知道更好的解决方案吗?
适用于任何想要我们当前解决方案的人。这是UDAF的代码-注意,我们必须使用一些内部函数,所以我们在org.apache.spark.sql.types:包中
package org.apache.spark.sql.types
case class MaxValueByKey(child1: Expression, child2: Expression) extends DeclarativeAggregate {
override def children: Seq[Expression] = child1 :: child2 :: Nil
override def nullable: Boolean = true
// Return data type.
override def dataType: DataType = child2.dataType
// Expected input data type.
override def inputTypes: Seq[AbstractDataType] = Seq(AnyDataType, AnyDataType)
override def checkInputDataTypes(): TypeCheckResult =
TypeUtils.checkForOrderingExpr(child1.dataType, "function max")
private lazy val max = AttributeReference("max", child1.dataType)()
private lazy val data = AttributeReference("data", child2.dataType)()
override lazy val aggBufferAttributes: Seq[AttributeReference] = max :: data :: Nil
override lazy val initialValues: Seq[Expression] = Seq(
Literal.create(null, child1.dataType),
Literal.create(null, child2.dataType)
)
override lazy val updateExpressions: Seq[Expression] =
chooseKeyValue(max, data, child1, child2)
override lazy val mergeExpressions: Seq[Expression] =
chooseKeyValue(max.left, data.left, max.right, data.right)
def chooseKeyValue(key1:Expression, value1: Expression, key2:Expression, value2: Expression) = Seq(
If(IsNull(key1), key2, If(IsNull(key2), key1, If(GreaterThan(key1, key2), key1, key2))),
If(IsNull(key1), value2, If(IsNull(key2), value1, If(GreaterThan(key1, key2), value1, value2)))
)
override lazy val evaluateExpression: AttributeReference = data
}
object SparkMoreUDAFs {
def maxValueByKey(key: Column, value: Column): Column =
Column(MaxValueByKey(key.expr, value.expr).toAggregateExpression(false))
}
用法是:
sqlContext.table("tbl").groupBy($"id",$"action_type")
.agg(SparkMoreUDAFs.maxValueByKey($"date", expr("struct(date,*)")).as("s"))
我不确定它是否非常优雅,但它完成了映射端的部分聚合,适用于所有列类型。此外,我认为这个UDAF本身也是有用的。
希望它能帮助到别人。。
当键可排序时,UDAF(看起来相当不错的BTW)可以工作。这也应该适用于max(struct(key, value))
(如果不适用,请告诉我)。地图目前无法订购,我已经做了一些初步工作(https://github.com/apache/spark/pull/15970),但它可能需要一种更全面的方法。
你能详细说明下推谓词吗?我很好奇那里发生了什么。