使用平面地图组时的类型问题



>我正在将 spark-1.6 rdd 转换为 spark-2.x 数据集

原始代码是:

val sample_data : Dataset[(Int, Array[Double])]
val samples : Array[Array[Array[Double]]] = sample_data.rdd
  .groupBy(x => x._1)
  .map(x => {
   val (id: Int, points: Iterable[(Int, Array[Double])]) = x
   val data1 = points.map(x => x._2).toArray
   data1
}).collect()

sample_data.rdd不再有效,因此我尝试使用数据集执行相同的操作。新方法使用flatMapGroups

val sample_data : Dataset[(Int, Array[Double])]
val samples : Array[Array[Array[Double]]] = sample_data
  .groupByKey(x => x._1)
  .flatMapGroups ( (id: Int, points: Iterable[(Int, Array[Double])]) =>
    Iterator(points.map((x:Int, y:Array[Double]) => y)).toList
  ).collect()

给出的错误是:

错误:(36, 25( 具有替代项的重载方法值映射:[B, that](f: ((Int, Array[Double](( => B((隐式 bf: scala.collection.generic.CanBuildFrom[Iterable[(Int, Array[Double](],B,that](That [B](f: ((Int, Array[Double](( => B(迭代器[B]不能应用于((Int,数组[双精度](=> 数组[双倍]( Iterator(points.map((x:Int, y:Array[Double]( => y((.toList

您能否提供一个如何使用flatMapGroups以及如何理解给定错误的示例?

points实际上是一个Iterator,但你把它投射到一个Iterable,所以编译器告诉你把它变成一个Iterator

这是您要执行的操作:

val samples: Array[Array[Array[Double]]] = sample_data
    .groupByKey(_._1)
    .flatMapGroups((id: Int, points: Iterator[(Int, Array[Double])]) =>
        Iterator(points.map(_._2).toArray)
    ).collect()

在迭代器中重新包装并没有为你服务,所以你可以像这样使用 mapGroups :

.mapGroups((_, points) => points.map(_._2).toArray)

但是,在这两种情况下,数组 [Array[_]] 都没有编码器。查看此处了解更多详情。

因此,要么自己实现隐式编码器(现有编码器(,要么坚持使用 RDD 接口。

相关内容

  • 没有找到相关文章