将reduceByKey从Spark转换为Flink



如何将此示例scala spark代码转换为apache-flink?

reduceByKey( (x, y) => (x._1 + y._1, ( (x._2) ++ y._2) ) )

我意识到reduceByKey并不存在于flink中,但它表明了我正在努力实现的目标。

谢谢你的帮助!

与Spark不同,Flink不需要键值对来执行reduce、join和coGroup操作。它可以直接在任何类型上执行它们,例如POJO、元组或用户类型。你必须向Flink提供的是它必须分组的字段。这可以是提取关键字、逻辑索引或字段名称的函数。当您调用reduce操作时,整个对象将被赋予reduce函数,而不仅仅是值部分。

因此,假设你有一个input: DataSet[(K, (T, List[U]))],其中K是密钥类型,那么你的reduce函数看起来像:

input.groupBy(0).reduce{
  (left: (K, (T, List[U])), right: (K, (T, List[U]))) =>
    val (key, (leftValue1, leftValue2)) = left
    val (_, (rightValue1, rightValue2)) = right
    (key, (leftValue1 + rightValue1, leftValue2 ++ rightValue2))
}

为了便于理解,我还为匿名函数提供了类型注释。但这并不是必须的。

更新

这是Humberto特定用例的解决方案,假设输入字段由3个条目的行组成,空格分隔,第三个条目是整数:

val input = env.readCsvFile[(String, String, Int)](filePath, lineDelimiter = "n", fieldDelimiter = " ")
val result = input
  .map (element => (element._1, element._3, Map(element._2 -> element._3)))
  .groupBy(0)
  .reduce{
    (left, right) =>
      val (key, left1, left2) = left
      val (_, right1, right2) = right
      (key, left1 + right1, left2 ++ right2)
  }

reduceByKey基本上将函数(V, V) => V应用于按键分组的(K, V)键值对的值。的等效"纯标量"实现

reduceByKey( (v1, v2) => (v1._1 + v2._1, ( (v1._2) ++ v2._2) ) )

可能是:

groupBy(_._1).mapValues(_.values.reduce((v1, v2) => ...))

(如果我做对了)


一个可能的flink实现可能是:

groupBy(0).reduce { (v1, v2) => (v1._1 + v2._1, ( (v1._2) ++ v2._2) ) }

按键分组,然后按应用于的函数减少。


在@Till解释后编辑

然后调用reduce操作时,整个对象给出了reduce函数,而不仅仅是value部分。

groupBy(0).map(_._2).reduce { (v1, v2) => (v1._1 + v2._1, ( (v1._2) ++ v2._2) ) }
// or the ugly:
groupBy(0).reduce { (kv1, kv2) => (kv1._2._1 + kv2._2._1, ( (kv1._2._2) ++ kv2._2._2) ) }

相关内容

  • 没有找到相关文章

最新更新