我有一个字符串数据集,我用一个可能失败的函数解析成一个case类的数据集(例如,如果我试图解析的数据不可用)。因此,该函数返回一个Option (Scala)。所以我最终与选项[MyCaseClass]的数据集。
Spark似乎接受该数据集并处理它,但如果解析失败,它不是返回None
,而是返回Some(MyCaseClass(null, null...))
。
下面是这样做的代码示例:
recordsDs
.map { record =>
val maybeArticle = unmarshallArticle(record)
if (maybeArticle.isEmpty) {
println(s"Could not parse record $record into an article.")
}
maybeArticle
}
.filter(_.isDefined)
.map(_.get)
.collect().toList // Always returns a List(Some(Article(null, null), Some(Article...
这是一个说明这个案例的笔记本https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/4480125715694487/1289561535151709/7956941984681624/latest.html
我的猜测是,当序列化然后反序列化Option值时,Spark使用Some()构造函数而不是检查Option是Some还是None。
我显然可以在我的对象周围创建一个包装器,像MaybeArticle(article: Option[Article])
,但我想知道Spark是否可以正确处理选项的数据集?
我认为解决方案是使用flatMap
。下面是一个非常愚蠢的例子:
scala> val ds = Seq(("a1"), ("a2"), ("a4"), ("b1"), ("b2")).toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds.show
+-----+
|value|
+-----+
| a1|
| a2|
| a4|
| b1|
| b2|
+-----+
scala> val ds2 = ds.flatMap{x => if (x.contains("a")) Some(x) else None}
ds2: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds2.show
+-----+
|value|
+-----+
| a1|
| a2|
| a4|
+-----+
这样做的原因是Some
和None
就像可以使用flatMap
(其中None
元素只是被提交)来解包的集合。