如何获取两个RDD[(String, Iterable[String])]的交集



数据由两列组成

A B
A C
A D
B A
B C
B D
B E
C A
C B
C D
C E
D A
D B
D C
D E
E B
E C
E D

在第一行中,将其视为 A 是 B 的朋友,依此类推。 我如何找到他们共同的朋友?

(A,B) -> (C D)

意思是 A 和 B 有共同的朋友 C 和 D。我接近做一个 groupByKey,结果如下。

(B,CompactBuffer(A, C, D, E))
(A,CompactBuffer(B, C, D))
(C,CompactBuffer(A, B, D, E))
(E,CompactBuffer(B, C, D))
(D,CompactBuffer(A, B, C, E))

代码:

val rdd: RDD[String] = spark.sparkContext.textFile("twocols.txt")
val splitrdd: RDD[(String, String)] = rdd.map { s =>
var str = s.split(" ")
new Tuple2(str(0), str(1))
}
val group: RDD[(String, Iterable[String])] = splitrdd.groupByKey()
group.foreach(println)

首先swap元素:

val swapped = splitRDD.map(_.swap)

然后自行加入并返回swap

val shared =  swapped.join(swapped).map(_.swap)

最后过滤掉重复项(如果需要(并groupByKey

shared.filter { case ((x, y), _) => x < y }.groupByKey

这只是一个丑陋的尝试:

假设你已经将你的两列转换为Array[Array[String]](或List[List[String]],它真的是一样的(,比如

val pairs=Array(
Array("A","B"),
Array("A","C"),
Array("A","D"),
Array("B","A"),
Array("B","C"),
Array("B","D"),
Array("B","E"),
Array("C","A"),
Array("C","B"),
Array("C","D"),
Array("C","E"),
Array("D","A"),
Array("D","B"),
Array("D","C"),
Array("D","E"),
Array("E","B"),
Array("E","C"),
Array("E","D")
)

定义要为其查找其共同朋友的组:

val group=Array("C","D")

以下将找到您小组中每个成员的好友

val friendsByMemberOfGroup=group.map(
i => pairs.filter(x=> x(1) contains i)
.map(x=>x(0))
)

例如,pairs.filter(x=>x(1) contains "C").map(x=>x(0))返回从第二列获取"C""C"的好友,从第一列获取其友元:

scala> pairs.filter(x=> x(1) contains "C").map(x=>x(0))
res212: Array[String] = Array(A, B, D, E)

下面的循环将找到您小组中所有成员的共同朋友

var commonFriendsOfGroup=friendsByMemberOfGroup(0).toSet
for(i <- 1 to friendsByMemberOfGroup.size-1){
commonFriendsOfGroup=
commonFriendsOfGroup.intersect(friendsByMemberOfGroup(i).toSet)
}

所以你得到

scala> commonFriendsOfGroup.toArray
res228: Array[String] = Array(A, B, E)

如果您将组更改为val group=Array("A","B","E")并应用前面的行,那么您将得到

scala> commonFriendsOfGroup.toArray
res230: Array[String] = Array(C, D)

从上次中断的地方继续:

val group: RDD[(String, Iterable[String])] = splitrdd.groupByKey()
val group_map = group.collectAsMap
val common_friends = group
.flatMap{case (x, friends) => 
friends.map{y => 
((x,y),group_map.get(y).get.toSet.intersect(friends.toSet))
}
}
scala> common_friends.foreach(println)
((B,A),Set(C, D))
((B,C),Set(A, D, E))
((B,D),Set(A, C, E))
((B,E),Set(C, D))
((D,A),Set(B, C))
((D,B),Set(A, C, E))
((D,C),Set(A, B, E))
((D,E),Set(B, C))
((A,B),Set(C, D))
((A,C),Set(B, D))
((A,D),Set(B, C))
((C,A),Set(B, D))
((C,B),Set(A, D, E))
((C,D),Set(A, B, E))
((C,E),Set(B, D))
((E,B),Set(C, D))
((E,C),Set(B, D))
((E,D),Set(B, C))

注意:这假设您的数据在两个方向上都有关系,如您的示例所示:(A B 和 B A(。如果不是这种情况,您需要添加一些代码来处理group_map.get(y)可能会返回None的事实。

所以我最终在客户端这样做了。不要这样做

val arr: Array[(String, Iterable[String])] = group.collect()
//arr.foreach(println)
var arr2 = scala.collection.mutable.Set[((String, String), List[String])]()
for (i <- arr)
for (j <- arr)
if (i != j) {
val s1 = i._2.toSet
val s2 = j._2.toSet
val s3 = s1.intersect(s2).toList
//println(s3)
val pair = if (i._1 < j._1) (i._1, j._1) else (j._1, i._1)
arr2 += ((pair, s3))
}
arr2.foreach(println)

结果是

((B,E),List(C, D))
((A,C),List(B, D))
((A,B),List(C, D))
((A,D),List(B, C))
((B,D),List(A, C, E))
((C,D),List(A, B, E))
((B,C),List(A, D, E))
((C,E),List(B, D))
((D,E),List(B, C))
((A,E),List(B, C, D))

我想知道我是否可以使用 Spark 中的转换来做到这一点。

相关内容

最新更新