反向映射RDD对象,该对象中包含mapPartitions



我正在尝试将RDD元组的mapPartitions内部的元素映射到RDD元组外部的元素。我尝试了各种方法,但一直遇到问题(不允许使用sparks数据帧/爆炸等(

我想要反向映射的数据如下(reducedRDD包含的内容(:

org.apache.spark.rdd.RDD[(Int, Double, (Int, Double))]
(451,2.7362637362637363,MapPartitionsRDD[28] at )

它应该会产生这样的结果(在这里你可以看到mapPartitions的内部集合被映射到4元组的前两个值(:

(451,2.7362637362637363,17,0.324)
(451,2.7362637362637363,49,0.846)
(451,2.7362637362637363,4,-0.127)

如前所述,我已经尝试了各种方法,并查看了SO上的相关线程,但没有成功。我希望应该是一个简单的平面图:

val reversedRDD = reducedRDD.flatMap{case (u, a, (i, d)) => (u, a, i, d)}

但现在我得到这个错误:

constructor cannot be instantiated to expected type;
[error]  found   : (T1, T2, T3)
[error]  required: org.apache.spark.rdd.RDD[(Int, Double, (Int, Double))]
[error] Error occurred in an application involving default arguments.
[error]   val test5 = test2.mapPartitions{case (u, a, (i, d)) => (u, a, i, d)}

reducedRDD是从以下对象创建的:

val reducedRDD = userAverages.map( a => (a._1, a._2, globAveDev)) 
val globAveDev = userNormDev.groupBy(_._2).mapValues(_.map(_._3)).map{case (i, r) => (i, (r.sum/r.size).toDouble)} 
val userAverages = userRatingsForUserAve.groupBy(_._1).mapValues(_.map(_._2)).map{case (u, r) => (u, (r.sum/r.size).toDouble)}

提前感谢!

使用笛卡尔乘积(方法(:解决了问题

val userCartesianProd = userAverages.cartesian(globAveDev).map{
case ((k1, v1), (k2, v2)) => (k1, k2, v1, v2)
}

最新更新