我有两个键值对rdd's A and B
,数据看起来像
A={(1,(1,john,CA)),
(2,(2,steve,NY)),
(3,(3,jonny,AL)),
(4,(4,Evan,AK)),
(5,(5,Tommy,AZ))}
B={(1,(1,john,WA)),
(1,(1,john,FL)),
(1,(1,john,GA)),
(2,(2,steve,NY)),
(3,(3,jonny,AL)),
(4,(4,Evan,AK)),
(5,(5,Tommy,AZ))}
Rdd B
对键1
有三个值,因此在应用cogroup
c = A.cogroup(B).filter { x => ((x._2._1) != (x._2._2)) }.collect() we get
c = {(1,CompactBuffer(1,john,CA),CompactBuffer(1,john,WA,1,john,FL,1,john,GA)}
在两个变量中收集两个CompactBuffers
,如下所示
d = c.map(tuple =>(tuple._2._1.mkString("")))
e = c.map(tuple =>(tuple._2._2.mkString("")))
迭代d
和e
for(x <-d)
{
for(y <-e){
println(x +" source and destination "+ y)
}
}
预期产出
1,john,CA source and destination 1,john,WA
1,john,CA source and destination 1,john,FL
1,john,CA source and destination 1,john,GA
接收的输出
1,john,CA source and destination 1,john,WA,1,john,FL,1,john,GA
我应该为迭代Second Tuple elements i.e Second Compactbuffer()
更改什么
如果您有任何问题或澄清,请告诉我。
正如评论中所建议的,mkString
正在将数组转换为一个元素的数组。您也可以通过将延迟迭代器转换为数组然后迭代它来评估它:
c.foreach { x =>
val arr1 = x._2._1.toArray
val arr2 = x._2._2.toArray
for (e1 <- arr1 ) {
for (e2 <- arr2 ) {
println (e1 + "-----------" + e2 )
}
}
}
(1,john,CA)-----------(1,john,WA)
(1,john,CA)-----------(1,john,FL)
(1,john,CA)-----------(1,john,GA)
使用您编写的内容,您可以将mkString
替换为flatMap
操作来评估迭代器:
d = c.flatMap(tuple =>tuple._2._1)
e = c.flatMap(tuple =>tuple._2._2)
然后继续您的for
循环。