向前填充pyspark到scala



得到以下pyspark代码,我如何更改它以使其适应scala。对丢失的数据进行正向和反向填充

import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame([
('d1',None),
('d2',10),
('d3',None),
('d4',30),
('d5',None),
('d6',None),
],('day','temperature'))
w_forward = Window.partitionBy().orderBy('day').rowsBetween(Window.unboundedPreceding,Window.currentRow)
w_backward = Window.partitionBy().orderBy('day').rowsBetween(Window.currentRow,Window.unboundedFollowing)
df.withColumn('fill_forward',F.last('temperature',ignorenulls=True).over(w_forward))
.withColumn('fill_both',F.first('fill_forward',ignorenulls=True).over(w_backward)).show()

此处:

case class Day(day: String, temperature: Option[Int])
import org.apache.spark.sql.expressions.Window
import spark.implicits._
import org.apache.spark.sql.functions.{last, first}
val df = spark
.createDataFrame[Day](
Seq(
Day("d1", None),
Day("d2", Some(10)),
Day("d3", None),
Day("d4", Some(30)),
Day("d5", None),
Day("d6", None)
)
)
val wForward = Window
.partitionBy()
.orderBy($"day")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
val wBackWard = Window
.partitionBy()
.orderBy($"day")
.rowsBetween(Window.currentRow, Window.unboundedFollowing)
df.withColumn(
"fill_forward",
last($"temperature", ignoreNulls = true).over(wForward)
).withColumn(
"fill_both",
first("fill_forward", ignoreNulls = true).over(wBackWard)
).show()

很简单,不是吗?

主要区别在于,如果希望避免使用Row显式设置df模式,则可以使用case类。

最新更新