2 个 rdd 之间的 scala 字转换操作



有一种数据格式有两列,然后每列用'\t'分隔。第一列是数字,第二列是含义。每列的数据都是字符串

111  A
112  B
113  C
114  D
115  E
116  F
117  G
118  H
...

其他数据也有两列,每列的数据都是字符串。格式是这样的。

111  112:0.75,114:0.43,117:0.21
112  113:0.67,114:0.48,115:0.34,116:0.12
113  114:0.33,118:0.12
...

然后我需要将第二个数据的数量转换为其特定含义。结果如下:

 A  B:0.75,D:0.43,G:0.21
 B  C:0.67,D:0.48,E:0.34,F:0.12
 ...

PS:这些数据格式是字符串!

如何编码?然后,如果第一个数据 rdd1 具有少量数据,则第二个数据 rdd2 具有大量数据。在Spark集群上运行,是否需要使用广播,具体方法是什么?谢谢你的回答

这是一种方法。假设您的两个RDDs都是 RDD[(String, String)] 型,并且您的第一个RDD较小。

//create your first RDD
val rdd1: RDD[(String, String)] = sc.parallelize(Seq(
  ("111", "A"),
  ("112", "B"),
  ("113", "C"),
  ("114", "D"),
  ("115", "E"),
  ("116", "F"),
  ("117", "G")))
//as this rdd is small so collect it and convert it to a map
val mapRdd1: Map[String, String] = rdd1.collect.toMap
//broadcast this map to all executors
val bRdd = sc.broadcast(mapRdd1)
//create your second rdd
val rdd2: RDD[(String, String)] = sc.parallelize(Seq(
  ("111", "112:0.75,114:0.43,117:0.21"),
  ("112", "113:0.67,114:0.48,115:0.34,116:0.12")))
val result: RDD[(String, String)] = rdd2.map(x => (x._1, //keep first string as it is
  x._2.split(",").map(a => a.split(":")) //split second string for the required transformations
    //fetch the value from the bradcasted map
    .map(t => (bRdd.value(t.head), t.last)).mkString(" ")))
result.foreach(println(_))
//output
//(111,(B,0.75) (D,0.43) (G,0.21))
//(112,(C,0.67) (D,0.48) (E,0.34) (F,0.12))

这假设rdd2的所有含义都存在于您的第一RDD中。如果没有,则在从地图获取值时使用 bRdd.value.getOrElse(t.head,"DEFAULT_VALUE") .

最新更新