何时合并发生在用户定义的汇总函数中,udaf在Spark中



我想知道在哪种情况下,火花将作为UDAF函数的一部分进行合并。

动机:我在Spark Project的窗口上使用了许多UDAF功能。我通常想回答一个问题:

与30天的当前交易在同一国家进行了多少次信用卡交易?

窗口将从当前事务开始,但不会在计数中包含它。它需要当前交易的价值才能知道过去30天内要计算哪个国家。

val rollingWindow = Window
      .partitionBy(partitionByColumn)
      .orderBy(orderByColumn.desc)
      .rangeBetween(0, windowSize)
df.withColumn(
  outputColumnName,
  customUDAF(inputColumn, orderByColumn).over(rollingWindow))

我写了我的Customudaf来进行计数。我始终使用.orderBy(orderByColumn.desc),并且由于.desc,当前交易在计算过程中首先出现在窗口中。

UDAF函数需要实现merge函数,该功能在并行计算中合并了两个中间聚合缓冲区。如果发生任何合并,我的current transaction对于不同的缓冲区可能不相同,UDAF的结果将不正确。

我编写了一个UDAF函数,该功能计算我的数据集中的合并数量,并且仅保留与当前事务的窗口中的第一个交易。

 class FirstUDAF() extends UserDefinedAggregateFunction {
  def inputSchema = new StructType().add("x", StringType)
    .add("y", StringType)
  def bufferSchema = new StructType()
    .add("first", StringType)
    .add("numMerge", IntegerType)
  def dataType = new StructType()
    .add("firstCode", StringType)
    .add("numMerge", IntegerType)
  def deterministic = true
  def initialize(buffer: MutableAggregationBuffer) = {
    buffer(0) = ""
    buffer(1) = 1
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (buffer.getString(0) == "")
      buffer(0) = input.getString(0)
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
    buffer1(1) = buffer1.getInt(1) + buffer2.getInt(1)
  }
  def evaluate(buffer: Row) = buffer
}

当我用Spark 2.0.1在具有16 CPU的本地主机上运行时,从来没有任何合并,并且窗口中的第一个交易始终是当前事务。这就是我要的。在不久的将来,我将在X100较大的数据集和实际分布式Spark群集上运行代码,并想知道是否可以在此处进行合并。

问题:

  • 在UDAF发生在哪些情况下/Conditons合并?
  • Windows与订单有合并是否有合并?
  • 是否可以告诉Spark不要合并?

在UDAF发生在哪些情况下/Conditons合并?

merge当汇总函数的部分应用("映射侧聚集")在洗牌后合并("减少侧聚集")。

Windows与订单有合并是否有合并?

当前实现中从未。目前,窗口函数只是花哨的groupByKey,并且没有部分聚合。这当然是实施细节,并且可能会在将来不进一步通知。

是否可以告诉Spark不要合并?

不是。但是,如果数据已经通过聚合密钥对数据进行分区,则不需要merge,并且仅使用combine

最后:

与30天的当前交易在同一国家进行了多少次信用卡交易?

不需要UDAFs或窗口功能。我可能会用o.a.s.sql.functions.window创建翻滚窗口,按用户,乡村和窗口进行汇总,然后与输入一起加入。

最新更新