>我正在将 spark-1.6 rdd 转换为 spark-2.x 数据集
原始代码是:
val sample_data : Dataset[(Int, Array[Double])]
val samples : Array[Array[Array[Double]]] = sample_data.rdd
.groupBy(x => x._1)
.map(x => {
val (id: Int, points: Iterable[(Int, Array[Double])]) = x
val data1 = points.map(x => x._2).toArray
data1
}).collect()
该sample_data.rdd
不再有效,因此我尝试使用数据集执行相同的操作。新方法使用flatMapGroups
val sample_data : Dataset[(Int, Array[Double])]
val samples : Array[Array[Array[Double]]] = sample_data
.groupByKey(x => x._1)
.flatMapGroups ( (id: Int, points: Iterable[(Int, Array[Double])]) =>
Iterator(points.map((x:Int, y:Array[Double]) => y)).toList
).collect()
给出的错误是:
错误:(36, 25( 具有替代项的重载方法值映射:[B, that](f: ((Int, Array[Double](( => B((隐式 bf: scala.collection.generic.CanBuildFrom[Iterable[(Int, Array[Double](],B,that](That [B](f: ((Int, Array[Double](( => B(迭代器[B]不能应用于((Int,数组[双精度](=> 数组[双倍]( Iterator(points.map((x:Int, y:Array[Double]( => y((.toList
您能否提供一个如何使用flatMapGroups以及如何理解给定错误的示例?
points
实际上是一个Iterator
,但你把它投射到一个Iterable
,所以编译器告诉你把它变成一个Iterator
。
这是您要执行的操作:
val samples: Array[Array[Array[Double]]] = sample_data
.groupByKey(_._1)
.flatMapGroups((id: Int, points: Iterator[(Int, Array[Double])]) =>
Iterator(points.map(_._2).toArray)
).collect()
在迭代器中重新包装并没有为你服务,所以你可以像这样使用 mapGroups :
.mapGroups((_, points) => points.map(_._2).toArray)
但是,在这两种情况下,数组 [Array[_]] 都没有编码器。查看此处了解更多详情。
因此,要么自己实现隐式编码器(现有编码器(,要么坚持使用 RDD
接口。