我创建了一个这种类型的RDD:
RDD[(Long, Iterable[(String, Double)])]
这里的第一个长参数是地图中原始点的 ID,第二个字符串参数是另一个点的 ID,它从一个固定数据集以字符串格式表示。第三个参数 Double 是两点之间的距离。
现在,我想找到给定点的最小距离点。所以,我想把这个RDD
转换为RDD[(Long, (String, Double))]
。因此,我将拥有最接近给定点集的所有点。
我目前拥有的输出是这样的
(4516831,CompactBuffer((POI1,2632.0690038389157), (POI2,2632.0690038389157), (POI3,666.9416656643995), (POI4,1450.3241112528403)))
(4516915,CompactBuffer((POI1,2632.0690038389157), (POI2,2632.0690038389157), (POI3,666.9416656643995), (POI4,1450.3241112528403)))
我尝试编写的代码是这样的。
`
// groupData returns output in the format RDD[(Long, (String, Double))]
val combinedData = groupData(dataRdd, poiRdd)
.groupByKey()
.map(row => {
var min:Double = 9999999
for(value <- row._2) yield
if (value._2 < min) {
min = value._2
} else min
(row._1, row._2.filter(r => r._2 == min))
}).foreach(println)
// output like this: (4516915,List((POI3,666.9416656643995)))`
上面的代码按照要求工作正常,但我认为它真的很糟糕,尤其是我根据需要返回输出的最后一行。我又在那里过滤了。一定有更好的方法。我怎样才能做到这一点?如果这个问题有点含糊不清,我很抱歉。
谢谢。
我认为您的示例输入数据是来自groupByKey
的中间数据。 如果是这样,假设您的原始RDD如下所示:
val rdd = sc.parallelize(Seq(
(4516831, ("POI1", 2632.0690038389157)),
(4516831, ("POI2", 2632.0690038389157)),
(4516831, ("POI3", 666.9416656643995)),
(4516831, ("POI4", 1450.3241112528403)),
(4516915, ("POI1", 2632.0690038389157)),
(4516915, ("POI2", 2632.0690038389157)),
(4516915, ("POI3", 666.9416656643995)),
(4516915, ("POI4", 1450.3241112528403))
))
val groupedRDD = rdd.groupByKey
// groupedRDD: org.apache.spark.rdd.RDD[(Int, Iterable[(String, Double)])] = ...
然后groupedRDD
应该具有与示例输入数据完全相同的数据。
然后,您可以处理分组RDD的mapValues
以使用reduce
捕获最小值,并根据输出要求将减少的结果包装在List
中:
val resultRDD1 = groupedRDD.mapValues( _.reduce{
(acc, x) => if (x._2 < acc._2) x else acc
}).map{ case (k, v) => (k, List(v))}
resultRDD1.collect
// res1: Array[(Int, List[(String, Double)])] = Array(
// (4516915,List((POI3,666.9416656643995))), (4516831,List((POI3,666.9416656643995)))
// )
但是,如果您可以简单地从原始RDD数据中工作,而不是使用groupByKey
,则使用性能更高的reduceByKey
,如下所示,则会更有效:
val resultRDD2 = rdd.reduceByKey(
(acc, x) => if (x._2 < acc._2) x else acc
).map{ case (k, v) => (k, List(v))}
resultRDD2.collect
// res2: Array[(Int, List[(String, Double)])] = Array(
// (4516915,List((POI3,666.9416656643995))), (4516831,List((POI3,666.9416656643995)))
// )