在Cogrouped Second Tuple CompactBuffer中进行迭代



我有两个键值对rdd's A and B,数据看起来像

A={(1,(1,john,CA)),
(2,(2,steve,NY)),
(3,(3,jonny,AL)),
(4,(4,Evan,AK)),
(5,(5,Tommy,AZ))} 
B={(1,(1,john,WA)),
(1,(1,john,FL)),
(1,(1,john,GA)),
(2,(2,steve,NY)),
(3,(3,jonny,AL)),
(4,(4,Evan,AK)),
(5,(5,Tommy,AZ))} 

Rdd B 对键1有三个值,因此在应用cogroup

c = A.cogroup(B).filter { x => ((x._2._1) != (x._2._2)) }.collect() we get 
c = {(1,CompactBuffer(1,john,CA),CompactBuffer(1,john,WA,1,john,FL,1,john,GA)}

在两个变量中收集两个CompactBuffers,如下所示

d = c.map(tuple =>(tuple._2._1.mkString("")))
e = c.map(tuple =>(tuple._2._2.mkString("")))

迭代de

如下所示
for(x <-d)
{
  for(y <-e){
  println(x +" source and destination "+ y)
  }
}

预期产出

1,john,CA  source and destination  1,john,WA
1,john,CA  source and destination  1,john,FL
1,john,CA  source and destination  1,john,GA

接收的输出

1,john,CA source and destination 1,john,WA,1,john,FL,1,john,GA

我应该为迭代Second Tuple elements i.e Second Compactbuffer()更改什么

如果您有任何问题或澄清,请告诉我。

正如评论中所建议的,mkString正在将数组转换为一个元素的数组。您也可以通过将延迟迭代器转换为数组然后迭代它来评估它:

c.foreach { x =>
    val arr1 = x._2._1.toArray
    val arr2 = x._2._2.toArray
    for (e1 <- arr1 ) {
        for (e2 <- arr2 ) {
            println (e1 + "-----------" + e2 ) 
        }
    }
 }
(1,john,CA)-----------(1,john,WA)
(1,john,CA)-----------(1,john,FL)
(1,john,CA)-----------(1,john,GA)

使用您编写的内容,您可以将mkString替换为flatMap操作来评估迭代器:

d = c.flatMap(tuple =>tuple._2._1)
e = c.flatMap(tuple =>tuple._2._2)

然后继续您的for循环。

相关内容

  • 没有找到相关文章

最新更新