数据由两列组成
A B
A C
A D
B A
B C
B D
B E
C A
C B
C D
C E
D A
D B
D C
D E
E B
E C
E D
在第一行中,将其视为 A 是 B 的朋友,依此类推。 我如何找到他们共同的朋友?
(A,B) -> (C D)
意思是 A 和 B 有共同的朋友 C 和 D。我接近做一个 groupByKey,结果如下。
(B,CompactBuffer(A, C, D, E))
(A,CompactBuffer(B, C, D))
(C,CompactBuffer(A, B, D, E))
(E,CompactBuffer(B, C, D))
(D,CompactBuffer(A, B, C, E))
代码:
val rdd: RDD[String] = spark.sparkContext.textFile("twocols.txt")
val splitrdd: RDD[(String, String)] = rdd.map { s =>
var str = s.split(" ")
new Tuple2(str(0), str(1))
}
val group: RDD[(String, Iterable[String])] = splitrdd.groupByKey()
group.foreach(println)
首先swap
元素:
val swapped = splitRDD.map(_.swap)
然后自行加入并返回swap
:
val shared = swapped.join(swapped).map(_.swap)
最后过滤掉重复项(如果需要(并groupByKey
:
shared.filter { case ((x, y), _) => x < y }.groupByKey
这只是一个丑陋的尝试:
假设你已经将你的两列转换为Array[Array[String]]
(或List[List[String]]
,它真的是一样的(,比如
val pairs=Array(
Array("A","B"),
Array("A","C"),
Array("A","D"),
Array("B","A"),
Array("B","C"),
Array("B","D"),
Array("B","E"),
Array("C","A"),
Array("C","B"),
Array("C","D"),
Array("C","E"),
Array("D","A"),
Array("D","B"),
Array("D","C"),
Array("D","E"),
Array("E","B"),
Array("E","C"),
Array("E","D")
)
定义要为其查找其共同朋友的组:
val group=Array("C","D")
以下将找到您小组中每个成员的好友
val friendsByMemberOfGroup=group.map(
i => pairs.filter(x=> x(1) contains i)
.map(x=>x(0))
)
例如,pairs.filter(x=>x(1) contains "C").map(x=>x(0))
返回从第二列获取"C"
的"C"
的好友,从第一列获取其友元:
scala> pairs.filter(x=> x(1) contains "C").map(x=>x(0))
res212: Array[String] = Array(A, B, D, E)
下面的循环将找到您小组中所有成员的共同朋友
var commonFriendsOfGroup=friendsByMemberOfGroup(0).toSet
for(i <- 1 to friendsByMemberOfGroup.size-1){
commonFriendsOfGroup=
commonFriendsOfGroup.intersect(friendsByMemberOfGroup(i).toSet)
}
所以你得到
scala> commonFriendsOfGroup.toArray
res228: Array[String] = Array(A, B, E)
如果您将组更改为val group=Array("A","B","E")
并应用前面的行,那么您将得到
scala> commonFriendsOfGroup.toArray
res230: Array[String] = Array(C, D)
从上次中断的地方继续:
val group: RDD[(String, Iterable[String])] = splitrdd.groupByKey()
val group_map = group.collectAsMap
val common_friends = group
.flatMap{case (x, friends) =>
friends.map{y =>
((x,y),group_map.get(y).get.toSet.intersect(friends.toSet))
}
}
scala> common_friends.foreach(println)
((B,A),Set(C, D))
((B,C),Set(A, D, E))
((B,D),Set(A, C, E))
((B,E),Set(C, D))
((D,A),Set(B, C))
((D,B),Set(A, C, E))
((D,C),Set(A, B, E))
((D,E),Set(B, C))
((A,B),Set(C, D))
((A,C),Set(B, D))
((A,D),Set(B, C))
((C,A),Set(B, D))
((C,B),Set(A, D, E))
((C,D),Set(A, B, E))
((C,E),Set(B, D))
((E,B),Set(C, D))
((E,C),Set(B, D))
((E,D),Set(B, C))
注意:这假设您的数据在两个方向上都有关系,如您的示例所示:(A B 和 B A(。如果不是这种情况,您需要添加一些代码来处理group_map.get(y)
可能会返回None
的事实。
所以我最终在客户端这样做了。不要这样做
val arr: Array[(String, Iterable[String])] = group.collect()
//arr.foreach(println)
var arr2 = scala.collection.mutable.Set[((String, String), List[String])]()
for (i <- arr)
for (j <- arr)
if (i != j) {
val s1 = i._2.toSet
val s2 = j._2.toSet
val s3 = s1.intersect(s2).toList
//println(s3)
val pair = if (i._1 < j._1) (i._1, j._1) else (j._1, i._1)
arr2 += ((pair, s3))
}
arr2.foreach(println)
结果是
((B,E),List(C, D))
((A,C),List(B, D))
((A,B),List(C, D))
((A,D),List(B, C))
((B,D),List(A, C, E))
((C,D),List(A, B, E))
((B,C),List(A, D, E))
((C,E),List(B, D))
((D,E),List(B, C))
((A,E),List(B, C, D))
我想知道我是否可以使用 Spark 中的转换来做到这一点。