我试图求解应用的cogroup
问题。但是我真的不知道...
如下示例中有两个带有不同键的RDD,只有在使用cogroup
是相同时,才可以提取有效的data1
?
val data1 = sc.parallelize(Seq(("aa", 1), ("ba", 2), ("bc", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))
val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = data1.cogroup(data2)
/* List(
(ba,(CompactBuffer(2),CompactBuffer())),
(bc,(CompactBuffer(2),CompactBuffer())),
(a,(CompactBuffer(),CompactBuffer(3))),
(b,(CompactBuffer(3),CompactBuffer(5))),
(c,(CompactBuffer(1),CompactBuffer())),
(aa,(CompactBuffer(1),CompactBuffer()))
) */
结果应为Array(("aa", 1), ("ba", 2), ("bc", 2), ("b", 3))
我通过使用 broadcast()
如@mrsrinivas所说,解决了这个问题。但是broadcast()
不适合大数据。
val bcast = sc.broadcast(data2.map(_._1).collect())
val result = data1.filter(r => bcast.value.contains(myFuncOper(r._1)))
是否有一种使用cogroup
和功能操作解决此问题的方法?
您可以在提取匹配data2
的密钥的键后使用cogroup
,然后使用filter
和map
删除无匹配的值并"重组"数据:
val result: RDD[(String, Int)] = data1
.keyBy(_._1.substring(0, 1)) // key by first character
.cogroup(data2)
.filter { case (_, (_, data2Values)) => data2Values.nonEmpty }
.flatMap { case (_, (data1Values, _)) => data1Values }
简短:
val result = data1
.flatMap(x => x._1.split("").map(y => (y, x)))
.join(data2)
.map(x => x._2._1)
.distinct
详细:
flatMap(x => x._1.split("").map(y => (y, x)))
保持
List(
(a, (aa, 1)),
(a, (aa, 1)),
(b, (ba, 2)),
(a, (ba, 2)),
(b, (bc, 2)),
(c, (bc, 2)),
(b, (b, 3)),
(c, (c, 1))
)
join(data2)
List(
(a, ((aa, 1), 3)),
(a, ((aa, 1), 3)),
(a, ((ba, 2), 3)),
(b, ((ba, 2), 5)),
(b, ((bc, 2), 5)),
(b, ((b, 3), 5))
)
现在我们对独特的第二对感兴趣,这可以由map(x => x._2._1).distinct