火花 (Scala) 数据帧过滤 (FIR)



假设我有一个数据帧(存储在scala val中作为df),其中包含来自csv的数据:

time,temperature
0,65
1,67
2,62
3,59

我在 scala 语言中将其作为 Spark 数据帧从文件中读取没有问题。

我想

添加一个过滤列(过滤器是指信号处理移动平均过滤),(假设我想做(T[n]+T[n-1])/2.0):

time,temperature,temperatureAvg
0,65,(65+0)/2.0
1,67,(67+65)/2.0
2,62,(62+67)/2.0
3,59,(59+62)/2.0

(实际上,对于第一行,我想要32.5而不是(65+0)/2.0.我写它是为了澄清预期的 2 时间步长过滤操作输出)

那么如何实现这一目标呢?我不熟悉沿列迭代组合行的 Spark 数据帧操作......

Spark 3.1+

取代

$"time".cast("timestamp")

import org.apache.spark.sql.functions.timestamp_seconds
timestamp_seconds($"time")

星火 2.0+

在 Spark 2.0 及更高版本中,可以使用window函数作为 groupBy 的输入。它允许您指定 windowDurationslideDurationstartTime(偏移量)。它仅适用于TimestampType列,但找到解决方法并不难。在您的情况下,需要一些额外的步骤来纠正边界,但一般解决方案可以表示如下:

import org.apache.spark.sql.functions.{window, avg}
df
    .withColumn("ts", $"time".cast("timestamp"))
    .groupBy(window($"ts", windowDuration="2 seconds", slideDuration="1 second"))
    .avg("temperature")

火花<2.0

如果有一种自然的方法来对数据进行分区,则可以按如下方式使用窗口函数:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.mean
val w = Window.partitionBy($"id").orderBy($"time").rowsBetween(-1, 0)
val df = sc.parallelize(Seq(
    (1L, 0, 65), (1L, 1, 67), (1L, 2, 62), (1L, 3, 59)
)).toDF("id", "time", "temperature")
df.select($"*", mean($"temperature").over(w).alias("temperatureAvg")).show
// +---+----+-----------+--------------+                             
// | id|time|temperature|temperatureAvg|
// +---+----+-----------+--------------+
// |  1|   0|         65|          65.0|
// |  1|   1|         67|          66.0|
// |  1|   2|         62|          64.5|
// |  1|   3|         59|          60.5|
// +---+----+-----------+--------------+

您可以使用lead/lag函数创建具有任意权重的窗口:

 lit(0.6) * $"temperature" + 
 lit(0.3) * lag($"temperature", 1) +
 lit(0.2) * lag($"temperature", 2)

如果没有partitionBy条款,它仍然是可能的,但效率极低。如果是这种情况,您将无法使用DataFrames .相反,您可以使用 RDD 上的sliding(例如,请参阅 Spark 中 RDD 中的邻居元素操作)。还有火花时间序列包,你可能会觉得有用。

相关内容

  • 没有找到相关文章

最新更新