在RDD中的某些条件下替换数据并过滤掉不需要的RDD



我有RDD,看起来像这样:

[((String, String, String), (String, String))]

示例数据如下所示:

((10,1,a),(x,3))
((10,2,b),(y,5))
((11,2,b),
((11,3,c),(z,4))

因此,如果键内第二个字符串的值是 2 或 3,请将其替换为 2-3,如果它是 1 或者如果 rdd 与第三个字符串类似,请删除该 rdd。

所以预期的输出是这样的:

((10,2-3,b),(y,5))
((11,2-3,c),(z,4))

给定输入数据为

val rdd = spark.sparkContext.parallelize(Seq(
  (("10","1","a"),("x","3")),
  (("10","2","b"),("y","5")),
  (("11","2","b"),()),
  (("11","3","c"),("z","4"))
))

您可以执行以下操作来获得所需的输出,如下所示

rdd.filter(x => x._1._2 != "1").filter(x => x._2 != ()).map(x => {
  if(x._1._2 == "2" || x._1._2 == "3") ((x._1._1, "2-3", x._1._3), x._2)
  else ((x._1._1, x._1._2, x._1._3), x._2)
})

您的输出将是

((10,2-3,b),(y,5))
((11,2-3,c),(z,4))

感谢慈善家指出它必须是String而不是Int

最新更新