Spark / Scala:用最后的观察结果向前填充



Using Spark 1.4.0, Scala 2.10

我一直在试图找出一种用最后一个已知观察结果转发填充空值的方法,但我看不到一个简单的方法。我认为这是一件很常见的事情,但找不到说明如何做到这一点的示例。

我看到用值转发填充 NaN 的函数,或者通过偏移量填充或移动数据的滞后/领先函数,但没有任何东西可以拾取最后一个已知值。

在网上看,我在R中看到了很多关于相同事情的问答,但在Spark/Scala中没有。

我正在考虑在日期范围内映射,从结果中过滤 NaN 并选择最后一个元素,但我想我对语法感到困惑。

使用数据帧,我尝试类似的东西

import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)

但这并不能让我得到任何地方。

过滤器部分不起作用;map 函数返回 Spark.sql.Columns 序列,但过滤器函数期望返回布尔值,所以我需要从 Column 中获取一个值进行测试,但似乎只有返回 Column 的 Column 方法。

有没有办法在 Spark 上更"简单"地做到这一点?

感谢您的输入

编辑

简单的示例示例输入:

2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...

预期产出:

2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22

  1. 我有很多列,其中许多列都有这种缺失的数据模式,但不在同一日期/时间。如果我需要,我将一次转换一列。

编辑

按照@zero323的回答,我尝试了这种方式:

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    val rows: RDD[Row] = df.orderBy($"Date").rdd

    def notMissing(row: Row): Boolean = { !row.isNullAt(1) }
    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
   case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap
    val toCarryBd = sc.broadcast(toCarry)
    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }
    val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}

广播变量最终成为没有 null 的值列表。这是进步,但我仍然无法使映射工作。但我什么也没得到,因为索引i 不映射到原始数据,它映射到没有 null 的子集。

我在这里错过了什么?

编辑和解决方案(从@zero323的答案推断(:

import org.apache.spark.sql.expressions.Window
val sqlContext = new HiveContext(sc)
var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")
val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))

如果您使用的是RDD而不是数据帧,请参阅下面的zero323的答案以获取更多选项。上面的解决方案可能不是最有效的,但对我有用。如果您想进行优化,请查看RDD解决方案。

初始答案(单个时间序列假设(:

首先,如果不能提供PARTITION BY子句,请尝试避免窗口函数。它将数据移动到单个分区,因此大多数时候根本不可行。

您可以做的是使用 mapPartitionsWithIndex 填补RDD的空白。由于您没有提供示例数据或预期输出,因此将其视为伪代码,而不是真正的 Scala 程序:

  • 首先让我们按日期订购DataFrame并转换为RDD

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    val rows: RDD[Row] = df.orderBy($"Date").rdd
    
  • 接下来,让我们找到每个分区的最后一个非空观测值

    def notMissing(row: Row): Boolean = ???
    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
      .mapPartitionsWithIndex{ case (i, iter) => 
        Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
      .collectAsMap
    
  • 并将此Map转换为广播

    val toCarryBd = sc.broadcast(toCarry)
    
  • 最后映射分区
  • 再次填补空白:

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
      // If it is the beginning of partition and value is missing
      // extract value to fill from toCarryBd.value
      // Remember to correct for empty / only missing partitions
      // otherwise take last not-null from the current partition
    }
    val imputed: RDD[Row] = rows
      .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) } 
    
  • 最后转换回数据帧

编辑(分区/每个组数据的时间序列(:

细节决定成败。如果您的数据毕竟是分区的,那么可以使用groupBy解决整个问题。假设您简单地按类型为 T 的列"v"进行分区,并且Date是一个整数时间戳:

def fill(iter: List[Row]): List[Row] = {
  // Just go row by row and fill with last non-empty value
  ???
}
val groupedAndSorted = df.rdd
  .groupBy(_.getAs[T]("k"))
  .mapValues(_.toList.sortBy(_.getAs[Int]("Date")))
val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)
val dfFilled = sqlContext.createDataFrame(rows, df.schema)

这样,您可以同时填充所有列。

这可以使用DataFrames完成,而不是来回转换为RDD吗?

这取决于,尽管它不太可能有效。如果最大差距相对较小,您可以执行以下操作:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.{WindowSpec, Window}
import org.apache.spark.sql.Column
val maxGap: Int = ???  // Maximum gap between observations
val columnsToFill: List[String] = ???  // List of columns to fill
val suffix: String = "_" // To disambiguate between original and imputed 
// Take lag 1 to maxGap and coalesce
def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
  // Generate lag values between 1 and maxGap
  val lags = (1 to maxGap).map(lag(col(c), _)over(w))
  // Add current, coalesce and set alias
  coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
}

// For each column you want to fill nulls apply makeCoalesce
val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))

// Finally select
val dfImputed = df.select($"*" :: lags: _*)

可以轻松调整以使用每列不同的最大间隙。

在最新的 Spark 版本中实现类似结果的一种更简单的方法是将lastignoreNulls 一起使用:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"k").orderBy($"Date")
  .rowsBetween(Window.unboundedPreceding, -1)
df.withColumn("value", coalesce($"value", last($"value", true).over(w)))

虽然可以删除partitionBy子句并全局应用此方法,但对于大型数据集,这种方法的成本过高。

只能使用 Window 函数(没有最后一个函数(和某种巧妙的分区来做到这一点。我个人真的很不喜欢不得不使用组合 组 然后进一步加入。

所以给定:

date,      currency, rate
20190101   JPY       NULL
20190102   JPY       2
20190103   JPY       NULL
20190104   JPY       NULL
20190102   JPY       3
20190103   JPY       4
20190104   JPY       NULL

我们可以使用 Window.unboundedPreceding 和 Window.unboundedFollow 来创建向前和向后填充的键。

以下代码:

val w1 = Window.partitionBy("currency").orderBy(asc("date"))
df
   .select("date", "currency", "rate")
   // Equivalent of fill.na(0, Seq("rate")) but can be more generic here
   // You may need an abs(col("rate")) if value col can be negative since it will not work with the following sums to build the forward and backward keys
   .withColumn("rate_filled", when(col("rate").isNull, lit(0)).otherwise(col("rate")))
   .withColumn("rate_backsum",
     sum("rate_filled").over(w1.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
   .withColumn("rate_forwardsum",
     sum("rate_filled").over(w1.rowsBetween(Window.currentRow, Window.unboundedFollowing)))

给:

date,      currency, rate,  rate_filled, rate_backsum, rate_forwardsum
20190101   JPY       NULL             0             0             9
20190102   JPY       2                2             2             9
20190103   JPY       NULL             0             2             7
20190104   JPY       NULL             0             2             7
20190102   JPY       3                3             5             7
20190103   JPY       4                4             9             4
20190104   JPY       NULL             0             9             0

因此,我们构建了两个键(x_backsum 和 x_forwardsum(,可用于填充和填充。具有以下两条火花线:

val wb = Window.partitionBy("currency", "rate_backsum")
val wf = Window.partitionBy("currency", "rate_forwardsum")
   ...
   .withColumn("rate_backfilled", avg("rate").over(wb))
   .withColumn("rate_forwardfilled", avg("rate").over(wf))

最后:

date,      currency, rate,   rate_backsum, rate_forwardsum, rate_ffilled
20190101   JPY       NULL               0               9              2
20190102   JPY       2                  2               9              2
20190103   JPY       NULL               2               7              3
20190104   JPY       NULL               2               7              3
20190102   JPY       3                  5               7              3
20190103   JPY       4                  9               4              4
20190104   JPY       NULL               9               0              0

相关内容

  • 没有找到相关文章

最新更新