在Spark RDD中展开选项



我试图在Spark RDD上重现scala-vanilla集合的flattenOption的行为。例如:

Seq(Some(1), None, Some(2), None, None).flatten
> Seq[Int] = List(1, 2)
// None are removed, and Some are unwrapped
sc.parallelize(Seq(Some(1), None, Some(2))).flatten.collect()
> error: value flatten is not a member of org.apache.spark.rdd.RDD[Option[Int]]
>      sc.parallelize(Seq(Some(1), None, Some(2), None, None)).flatten.collect()
// The function does not exist for RDDs

当然,以下方法有效,但这意味着在过滤之前进行收集,例如在一台机器上收集更大的集合。

sc.parallelize(Seq(Some(1), None, Some(2))).collect().flatten
> Array[Int] = Array(1, 2)

我找到的解决方案是

sc.parallelize(Seq(Some(1), None, Some(2))).filter(_.isDefined).map(_.get).collect()

但这还不是很清楚。有更干净的方法吗?

您可以简单地使用身份函数执行flatMap

sc.parallelize(Seq(Some(1), None, Some(2))).flatMap(x => x).collect
// res1: Array[Int] = Array(1, 2)

您可以在容器类上定义一个Extension方法,以便于扁平化并与您想要使用的编码样式保持一致。

通常在Scala中使用隐式类来实现这一点

您也可以使用映射并传入一个平坦的函数。

最新更新