在pyspark数据帧中每60行应用一个函数



我的数据帧被称为df,有123729行,看起来像这样:

+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0|     1|
|110| 127.0|     2|
|111| 127.0|     3|
|111| 127.0|     4|
|111| 126.0|     5|
|111| 127.0|     6|
|109| 126.0|     7|
|111| 126.0|     8|

我需要将每60行(或秒(聚合为多个值。每一分钟,我都想知道最小心率、平均心率、最大心率,以及在这几秒内最大ABP是否低于85。所需的输出看起来类似于下表,其中如果maxABP<85,否则为0。

警报1
Min_HRMax_HR1avg_HR分钟
70100801
60907502

我认为groupBy足以获得所需的结果。

df.show()
+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0|    10|
|110| 127.0|    20|
|111| 127.0|    30|
|111| 127.0|    40|
|111| 126.0|    50|
|111| 127.0|    60|
|109| 126.0|    70|
|111| 126.0|    80|
+---+------+------+
df.withColumn('Minute', f.expr('cast(Second / 60 as int)')) 
.groupBy('Minute').agg( 
f.round(f.min('HR'), 2).alias('Min_HR'), 
f.round(f.max('HR'), 2).alias('Max_HR'), 
f.round(f.avg('HR'), 2).alias('Avg_HR'), 
f.max('maxABP').alias('maxABP')) 
.withColumn('Alarm', f.expr('if(maxABP < 85, 1, 0)')) 
.show()
+------+------+------+------+------+-----+
|Minute|Min_HR|Max_HR|Avg_HR|maxABP|Alarm|
+------+------+------+------+------+-----+
|     1|   109|   111|110.33| 127.0|    0|
|     0|   110|   111| 110.6| 128.0|    0|
+------+------+------+------+------+-----+

示例DF:

df = spark.createDataFrame(
[
(110, 128.0, 1),(110, 127.0, 2),(111, 127.0, 3),(111, 127.0, 4)
,(111, 126.0, 5),(111, 127.0, 6),(109, 126.0, 7),(111, 126.0, 1001)
,(114, 126.0, 1003),(115, 83.0, 1064),(116, 127.0, 1066)
], ['HR', 'maxABP', 'Second']
)
+---+------+------+
| HR|maxABP|Second|
+---+------+------+
|110| 128.0|     1|
|110| 127.0|     2|
|111| 127.0|     3|
|111| 127.0|     4|
|111| 126.0|     5|
|111| 127.0|     6|
|109| 126.0|     7|
|111| 126.0|  1001|
|114| 126.0|  1003|
|115|  83.0|  1064|
|116| 127.0|  1066|

然后使用窗口功能:

import pyspark.sql.functions as F
from pyspark.sql.window import Window
w1 = (Window.partitionBy(F.col('Minute')))
df
.withColumn('Minute', F.round(F.col('Second')/60,0)+1)
.withColumn('Min_HR', F.min('HR').over(w1))
.withColumn('Max_HR', F.max('HR').over(w1))
.withColumn('Avg_HR', F.round(F.avg('HR').over(w1),0))
.withColumn('Min_ABP', F.round(F.min('maxABP').over(w1),0))
.select('Min_HR','Max_HR','Min_ABP','Avg_HR','Minute')
.dropDuplicates()
.withColumn('Alarm', F.when(F.col('Min_ABP')<85, 1).otherwise(F.lit('0')))
.select('Min_HR','Max_HR','Avg_HR','Alarm','Minute')
.orderBy('Minute')
.show()
+------+------+------+-----+------+
|Min_HR|Max_HR|Avg_HR|Alarm|Minute|
+------+------+------+-----+------+
|   109|   111| 110.0|    0|   1.0|
|   111|   114| 113.0|    0|  18.0|
|   115|   116| 116.0|    1|  19.0|

最新更新