我试图在Spark RDD上重现scala-vanilla集合的flatten
和Option
的行为。例如:
Seq(Some(1), None, Some(2), None, None).flatten
> Seq[Int] = List(1, 2)
// None are removed, and Some are unwrapped
sc.parallelize(Seq(Some(1), None, Some(2))).flatten.collect()
> error: value flatten is not a member of org.apache.spark.rdd.RDD[Option[Int]]
> sc.parallelize(Seq(Some(1), None, Some(2), None, None)).flatten.collect()
// The function does not exist for RDDs
当然,以下方法有效,但这意味着在过滤之前进行收集,例如在一台机器上收集更大的集合。
sc.parallelize(Seq(Some(1), None, Some(2))).collect().flatten
> Array[Int] = Array(1, 2)
我找到的解决方案是
sc.parallelize(Seq(Some(1), None, Some(2))).filter(_.isDefined).map(_.get).collect()
但这还不是很清楚。有更干净的方法吗?
您可以简单地使用身份函数执行flatMap
:
sc.parallelize(Seq(Some(1), None, Some(2))).flatMap(x => x).collect
// res1: Array[Int] = Array(1, 2)
您可以在容器类上定义一个Extension方法,以便于扁平化并与您想要使用的编码样式保持一致。
通常在Scala中使用隐式类来实现这一点
您也可以使用映射并传入一个平坦的函数。