用Scala在Spark中映射RDD到case(Schema)



我是scala和spark的新手。我有个小问题。我有一个RDD与以下模式。

    RDD[((String, String), (Int, Timestamp, String, Int))]

我需要将这个RDD映射成这样

   RDD[(Int, String, String, String, Timestamp, Int)]
我为这个 写了下面的代码
  map { case ((pid, name), (id, date, code, level)) => (id, name, code, pid, date, level) }

这个工作很好。现在我有了另一个RDD

    RDD[((String, String), List[(Int, Timestamp, String, Int)])]

我想把它像上面那样转换成

   RDD[(Int, String, String, String, Timestamp, Int)]

我怎么能做到我已经尝试过这个代码,但它不工作

  map {
  case ((pid, name), List(id, date, code, level)) => (id, name, code, pid, date, level)
}

如何实现?

这就是你要找的东西吗?

val input: RDD[((String, String), List[(Int, Timestamp, String, Int)])] = ...
val output: RDD[(Int, String, String, String, Timestamp, Int)] = input.flatMap { case ((pid, name), list) =>
  list.map { case (id, date, code, level) =>
    (id, name, code, pid, date, level)
  }
}

或用于理解:

val output: RDD[(Int, String, String, String, Timestamp, Int)] = for {
  ((pid, name), list)     <- input
  (id, date, code, level) <- list
} yield (id, name, code, pid, date, level)

try

 map {
  case ((id, name), list) => (id, name, list.flatten)
}

相关内容

  • 没有找到相关文章

最新更新