我有RDD,看起来像这样:
[((String, String, String), (String, String))]
示例数据如下所示:
((10,1,a),(x,3))
((10,2,b),(y,5))
((11,2,b),
((11,3,c),(z,4))
因此,如果键内第二个字符串的值是 2 或 3,请将其替换为 2-3,如果它是 1 或者如果 rdd 与第三个字符串类似,请删除该 rdd。
所以预期的输出是这样的:
((10,2-3,b),(y,5))
((11,2-3,c),(z,4))
给定输入数据为
val rdd = spark.sparkContext.parallelize(Seq(
(("10","1","a"),("x","3")),
(("10","2","b"),("y","5")),
(("11","2","b"),()),
(("11","3","c"),("z","4"))
))
您可以执行以下操作来获得所需的输出,如下所示
rdd.filter(x => x._1._2 != "1").filter(x => x._2 != ()).map(x => {
if(x._1._2 == "2" || x._1._2 == "3") ((x._1._1, "2-3", x._1._3), x._2)
else ((x._1._1, x._1._2, x._1._3), x._2)
})
您的输出将是
((10,2-3,b),(y,5))
((11,2-3,c),(z,4))
感谢慈善家指出它必须是String
而不是Int
。