我想知道在哪种情况下,火花将作为UDAF函数的一部分进行合并。
动机:我在Spark Project的窗口上使用了许多UDAF功能。我通常想回答一个问题:
与30天的当前交易在同一国家进行了多少次信用卡交易?
窗口将从当前事务开始,但不会在计数中包含它。它需要当前交易的价值才能知道过去30天内要计算哪个国家。
val rollingWindow = Window
.partitionBy(partitionByColumn)
.orderBy(orderByColumn.desc)
.rangeBetween(0, windowSize)
df.withColumn(
outputColumnName,
customUDAF(inputColumn, orderByColumn).over(rollingWindow))
我写了我的Customudaf来进行计数。我始终使用.orderBy(orderByColumn.desc)
,并且由于.desc
,当前交易在计算过程中首先出现在窗口中。
UDAF函数需要实现merge
函数,该功能在并行计算中合并了两个中间聚合缓冲区。如果发生任何合并,我的current transaction
对于不同的缓冲区可能不相同,UDAF的结果将不正确。
我编写了一个UDAF函数,该功能计算我的数据集中的合并数量,并且仅保留与当前事务的窗口中的第一个交易。
class FirstUDAF() extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
.add("y", StringType)
def bufferSchema = new StructType()
.add("first", StringType)
.add("numMerge", IntegerType)
def dataType = new StructType()
.add("firstCode", StringType)
.add("numMerge", IntegerType)
def deterministic = true
def initialize(buffer: MutableAggregationBuffer) = {
buffer(0) = ""
buffer(1) = 1
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (buffer.getString(0) == "")
buffer(0) = input.getString(0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
}
def evaluate(buffer: Row) = buffer
}
当我用Spark 2.0.1在具有16 CPU的本地主机上运行时,从来没有任何合并,并且窗口中的第一个交易始终是当前事务。这就是我要的。在不久的将来,我将在X100较大的数据集和实际分布式Spark群集上运行代码,并想知道是否可以在此处进行合并。
问题:
- 在UDAF发生在哪些情况下/Conditons合并?
- Windows与订单有合并是否有合并?
- 是否可以告诉Spark不要合并?
在UDAF发生在哪些情况下/Conditons合并?
merge
当汇总函数的部分应用("映射侧聚集")在洗牌后合并("减少侧聚集")。
Windows与订单有合并是否有合并?
在当前实现中从未。目前,窗口函数只是花哨的groupByKey
,并且没有部分聚合。这当然是实施细节,并且可能会在将来不进一步通知。
是否可以告诉Spark不要合并?
不是。但是,如果数据已经通过聚合密钥对数据进行分区,则不需要merge
,并且仅使用combine
。
最后:
与30天的当前交易在同一国家进行了多少次信用卡交易?
不需要UDAFs
或窗口功能。我可能会用o.a.s.sql.functions.window
创建翻滚窗口,按用户,乡村和窗口进行汇总,然后与输入一起加入。