我想使用最后一个已知的观察填充nan值 - 请参见:Spark/Scala:填充NAN,用最后一个好观察
我当前的解决方案使用的窗口功能是为了完成任务。但这不是很好,因为所有值都映射到一个分区中。 val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }
应该更好地工作。但奇怪的是,我的fill
功能没有执行。我的代码怎么了?
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
| null| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
这是完整的示例代码:
import java.sql.Date
import org.apache.log4j.{ Level, Logger }
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
case class FooBar(foo: Date, bar: String)
object WindowFunctionExample extends App {
Logger.getLogger("org").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
.setAppName("foo")
.setMaster("local[*]")
val spark: SparkSession = SparkSession
.builder()
.config(conf)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val myDff = Seq(("2016-01-01", "first"), ("2016-01-02", "second"),
("2016-wrongFormat", "noValidFormat"),
("2016-01-04", "lastAssumingSameDate"))
val recordsDF = myDff
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
recordsDF.show
def notMissing(row: FooBar): Boolean = {
row.foo != null
}
val toCarry = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }.collectAsMap
println("###################### carry ")
println(toCarry)
println(toCarry.foreach(println))
println("###################### carry ")
val toCarryBd = spark.sparkContext.broadcast(toCarry)
def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = {
var lastNotNullRow: FooBar = toCarryBd.value(i).get
iter.map(row => {
if (!notMissing(row))1
FooBar(lastNotNullRow.foo, row.bar)
else {
lastNotNullRow = row
row
}
})
}
// The algorithm does not step into the for loop for filling the null values. Strange
val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }
val imputedDF = imputed.toDS()
println(imputedDF.orderBy($"foo").collect.toList)
imputedDF.show
spark.stop
}
编辑
我修复了评论所述的代码。但是toCarryBd
包含None
值。当我对
def notMissing(row: FooBar): Boolean = {row.foo != null}
iter.filter(notMissing(_)).toSeq.lastOption
非None
值。
(2,None)
(5,None)
(4,None)
(7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
(1,Some(FooBar(2016-01-01,first)))
(3,Some(FooBar(2016-01-02,second)))
(6,None)
(0,None)
尝试访问toCarryBd
时,这会导致NoSuchElementException: None.get
。
首先,如果您的foo
字段可能为null,我建议将案例类创建为:
case class FooBar(foo: Option[Date], bar: String)
然后,您可以将您的通用功能重写为:
def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined