我的输入数据帧如下所示:
index bucket time ap station rssi
0 1 00:00 1 1 -84.0
1 1 00:00 1 3 -67.0
2 1 00:00 1 4 -82.0
3 1 00:00 1 2 -68.0
4 2 00:15 1 3 -83.0
5 2 00:15 1 2 -82.0
6 2 00:15 1 4 -80.0
7 2 00:15 1 1 -72.0
8 3 00:30 1 4 -85.0
9 3 00:30 1 3 -77.0
10 3 00:30 1 2 -70.0
我是 Scala Spark 的新手,我想像这样循环数据:
for each ap
for each station
for each bucket
if rssi(previous bucket)<rssi(bucket)
print message
这是我的火花应用程序的开始:
object coveralg {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("coveralg").getOrCreate()
import spark.implicits._
val input_data = spark.read.format("csv").option("header","true").load(args(0))
}
}
但是我不知道如何在数据帧上实现循环并选择值来执行 if
数据帧不是为此而设计的。 它们旨在对每条记录应用相同的转换或减少它们。 您可以添加一个带有实现if
Boolean
的列:
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"ap",$"station",$"bucket").
orderBy(unix_timestamp($"time")).
rangeBetween(Long.MinValue, -1)
val df = input_data.withColumn("shouldPrintMessage",when(max($"rssi".over(w))>$"rssi",true))