如何使用Spark中的功能操作使用应用的cogroup方法



我试图求解应用的cogroup问题。但是我真的不知道...

如下示例中有两个带有不同键的RDD,只有在使用cogroup是相同时,才可以提取有效的data1

val data1 = sc.parallelize(Seq(("aa", 1), ("ba", 2), ("bc", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))
val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = data1.cogroup(data2)
/* List(
    (ba,(CompactBuffer(2),CompactBuffer())), 
    (bc,(CompactBuffer(2),CompactBuffer())), 
    (a,(CompactBuffer(),CompactBuffer(3))), 
    (b,(CompactBuffer(3),CompactBuffer(5))), 
    (c,(CompactBuffer(1),CompactBuffer())), 
    (aa,(CompactBuffer(1),CompactBuffer()))
) */

结果应为Array(("aa", 1), ("ba", 2), ("bc", 2), ("b", 3))

我通过使用 broadcast()如@mrsrinivas所说,解决了这个问题。但是broadcast()不适合大数据。

val bcast = sc.broadcast(data2.map(_._1).collect())
val result = data1.filter(r => bcast.value.contains(myFuncOper(r._1)))

是否有一种使用cogroup和功能操作解决此问题的方法?

您可以在提取匹配data2的密钥的键后使用cogroup,然后使用filtermap删除无匹配的值并"重组"数据:

val result: RDD[(String, Int)] = data1
  .keyBy(_._1.substring(0, 1)) // key by first character
  .cogroup(data2)
  .filter { case (_, (_, data2Values)) => data2Values.nonEmpty }
  .flatMap { case (_, (data1Values, _)) => data1Values }

简短:

val result = data1
  .flatMap(x => x._1.split("").map(y => (y, x)))
  .join(data2)
  .map(x => x._2._1)
  .distinct

详细:

flatMap(x => x._1.split("").map(y => (y, x)))保持

 List(
  (a, (aa, 1)),
  (a, (aa, 1)),
  (b, (ba, 2)),
  (a, (ba, 2)),
  (b, (bc, 2)),
  (c, (bc, 2)),
  (b, (b, 3)),
  (c, (c, 1))
)

join(data2)

之后
List(
  (a, ((aa, 1), 3)),
  (a, ((aa, 1), 3)),
  (a, ((ba, 2), 3)),
  (b, ((ba, 2), 5)),
  (b, ((bc, 2), 5)),
  (b, ((b, 3), 5))
)

现在我们对独特的第二对感兴趣,这可以由map(x => x._2._1).distinct

完成

相关内容

  • 没有找到相关文章

最新更新