火花图分区以填充NAN值



我想使用最后一个已知的观察填充nan值 - 请参见:Spark/Scala:填充NAN,用最后一个好观察

我当前的解决方案使用的窗口功能是为了完成任务。但这不是很好,因为所有值都映射到一个分区中。 val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }应该更好地工作。但奇怪的是,我的fill功能没有执行。我的代码怎么了?

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|      null|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

这是完整的示例代码:

import java.sql.Date
import org.apache.log4j.{ Level, Logger }
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
case class FooBar(foo: Date, bar: String)
object WindowFunctionExample extends App {
  Logger.getLogger("org").setLevel(Level.WARN)
val conf: SparkConf = new SparkConf()
    .setAppName("foo")
    .setMaster("local[*]")
  val spark: SparkSession = SparkSession
    .builder()
    .config(conf)
    .enableHiveSupport()
    .getOrCreate()
  import spark.implicits._
  val myDff = Seq(("2016-01-01", "first"), ("2016-01-02", "second"),
    ("2016-wrongFormat", "noValidFormat"),
    ("2016-01-04", "lastAssumingSameDate"))
  val recordsDF = myDff
    .toDF("foo", "bar")
    .withColumn("foo", 'foo.cast("Date"))
    .as[FooBar]
  recordsDF.show
  def notMissing(row: FooBar): Boolean = {
    row.foo != null
  }
  val toCarry = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }.collectAsMap
  println("###################### carry ")
  println(toCarry)
  println(toCarry.foreach(println))
  println("###################### carry ")
  val toCarryBd = spark.sparkContext.broadcast(toCarry)
  def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = {
    var lastNotNullRow: FooBar = toCarryBd.value(i).get
    iter.map(row => {
      if (!notMissing(row))1
        FooBar(lastNotNullRow.foo, row.bar)
      else {
        lastNotNullRow = row
        row
      }
    })
  }
  // The algorithm does not step into the for loop for filling the null values. Strange
  val imputed: RDD[FooBar] = recordsDF.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) }
  val imputedDF = imputed.toDS()
  println(imputedDF.orderBy($"foo").collect.toList)
  imputedDF.show
  spark.stop
}

编辑

我修复了评论所述的代码。但是toCarryBd包含None值。当我对

明确过滤时,这怎么会发生
def notMissing(row: FooBar): Boolean = {row.foo != null}
iter.filter(notMissing(_)).toSeq.lastOption

None值。

(2,None)
(5,None)
(4,None)
(7,Some(FooBar(2016-01-04,lastAssumingSameDate)))
(1,Some(FooBar(2016-01-01,first)))
(3,Some(FooBar(2016-01-02,second)))
(6,None)
(0,None)

尝试访问toCarryBd时,这会导致NoSuchElementException: None.get

首先,如果您的foo字段可能为null,我建议将案例类创建为:

case class FooBar(foo: Option[Date], bar: String)

然后,您可以将您的通用功能重写为:

def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined

相关内容

  • 没有找到相关文章