Spark scala dataframe for loop



我的输入数据帧如下所示:

index    bucket    time    ap   station    rssi
0         1        00:00   1       1       -84.0
1         1        00:00   1       3       -67.0
2         1        00:00   1       4       -82.0
3         1        00:00   1       2       -68.0
4         2        00:15   1       3       -83.0
5         2        00:15   1       2       -82.0
6         2        00:15   1       4       -80.0
7         2        00:15   1       1       -72.0
8         3        00:30   1       4       -85.0
9         3        00:30   1       3       -77.0
10        3        00:30   1       2       -70.0

我是 Scala Spark 的新手,我想像这样循环数据:

for each ap 
for each station 
for each bucket 
if rssi(previous bucket)<rssi(bucket)
print message

这是我的火花应用程序的开始:

object coveralg {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("coveralg").getOrCreate()
import spark.implicits._
val input_data =  spark.read.format("csv").option("header","true").load(args(0))
}
}

但是我不知道如何在数据帧上实现循环并选择值来执行 if

数据帧不是为此而设计的。 它们旨在对每条记录应用相同的转换或减少它们。 您可以添加一个带有实现ifBoolean的列:

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"ap",$"station",$"bucket").
orderBy(unix_timestamp($"time")).
rangeBetween(Long.MinValue, -1)
val df = input_data.withColumn("shouldPrintMessage",when(max($"rssi".over(w))>$"rssi",true))

最新更新