我的数据帧被称为df,有123729行,看起来像这样:
+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0| 1|
|110| 127.0| 2|
|111| 127.0| 3|
|111| 127.0| 4|
|111| 126.0| 5|
|111| 127.0| 6|
|109| 126.0| 7|
|111| 126.0| 8|
我需要将每60行(或秒(聚合为多个值。每一分钟,我都想知道最小心率、平均心率、最大心率,以及在这几秒内最大ABP是否低于85。所需的输出看起来类似于下表,其中如果maxABP<85,否则为0。
Min_HR | Max_HR | 1avg_HR | 警报分钟 | |
---|---|---|---|---|
70 | 100 | 80 | 1 | 1|
60 | 90 | 75 | 0 | 2 |
我认为groupBy
足以获得所需的结果。
df.show()
+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0| 10|
|110| 127.0| 20|
|111| 127.0| 30|
|111| 127.0| 40|
|111| 126.0| 50|
|111| 127.0| 60|
|109| 126.0| 70|
|111| 126.0| 80|
+---+------+------+
df.withColumn('Minute', f.expr('cast(Second / 60 as int)'))
.groupBy('Minute').agg(
f.round(f.min('HR'), 2).alias('Min_HR'),
f.round(f.max('HR'), 2).alias('Max_HR'),
f.round(f.avg('HR'), 2).alias('Avg_HR'),
f.max('maxABP').alias('maxABP'))
.withColumn('Alarm', f.expr('if(maxABP < 85, 1, 0)'))
.show()
+------+------+------+------+------+-----+
|Minute|Min_HR|Max_HR|Avg_HR|maxABP|Alarm|
+------+------+------+------+------+-----+
| 1| 109| 111|110.33| 127.0| 0|
| 0| 110| 111| 110.6| 128.0| 0|
+------+------+------+------+------+-----+
示例DF:
df = spark.createDataFrame(
[
(110, 128.0, 1),(110, 127.0, 2),(111, 127.0, 3),(111, 127.0, 4)
,(111, 126.0, 5),(111, 127.0, 6),(109, 126.0, 7),(111, 126.0, 1001)
,(114, 126.0, 1003),(115, 83.0, 1064),(116, 127.0, 1066)
], ['HR', 'maxABP', 'Second']
)
+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0| 1|
|110| 127.0| 2|
|111| 127.0| 3|
|111| 127.0| 4|
|111| 126.0| 5|
|111| 127.0| 6|
|109| 126.0| 7|
|111| 126.0| 1001|
|114| 126.0| 1003|
|115| 83.0| 1064|
|116| 127.0| 1066|
然后使用窗口功能:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
w1 = (Window.partitionBy(F.col('Minute')))
df
.withColumn('Minute', F.round(F.col('Second')/60,0)+1)
.withColumn('Min_HR', F.min('HR').over(w1))
.withColumn('Max_HR', F.max('HR').over(w1))
.withColumn('Avg_HR', F.round(F.avg('HR').over(w1),0))
.withColumn('Min_ABP', F.round(F.min('maxABP').over(w1),0))
.select('Min_HR','Max_HR','Min_ABP','Avg_HR','Minute')
.dropDuplicates()
.withColumn('Alarm', F.when(F.col('Min_ABP')<85, 1).otherwise(F.lit('0')))
.select('Min_HR','Max_HR','Avg_HR','Alarm','Minute')
.orderBy('Minute')
.show()
+------+------+------+-----+------+
|Min_HR|Max_HR|Avg_HR|Alarm|Minute|
+------+------+------+-----+------+
| 109| 111| 110.0| 0| 1.0|
| 111| 114| 113.0| 0| 18.0|
| 115| 116| 116.0| 1| 19.0|