我可以在pySpark python中实现这一点吗?通过优化一点或不优化。我被屏蔽了
初始csv
日期 | 时间 | idSensor1 | valueSensor1 | idSensor2 | valueSensor2 | 20210113 | 10:01:01.000000 | 171 | 5 | 173 | 8 |
---|---|---|---|---|---|
20210113 | 10:05:05.111111 | 171 | 6 | 172 | 8 |
20210113 | 10:08:08.222222 | 171 | 4 | 172 | 8 |
20210113 | 10:10:10.333333 | 171 | 6 | 172 | 8 |
20210113 | 10:15:15.444444 | 171 | 8 | 172 | 10 |
20210113 | 10:18:18.555555 | 171 | 9 | 172 | 10 |
您可以在按unix时间戳排序的窗口上使用collect_list
,范围从当前行前600秒(10分钟)到当前行前1秒:
df2 = df.withColumn(
'intervalsValuesSensor1',
F.collect_list('valueSensor1').over(
Window.partitionBy('idSensor1')
.orderBy(F.unix_timestamp(F.concat('Date', 'Time'), 'yyyyMMddHH:mm:ss.SSSSSS'))
.rangeBetween(-600, -1)
)
)
df2.show()
+--------+---------------+---------+------------+---------+------------+----------------------+
| Date| Time|idSensor1|valueSensor1|idSensor2|valueSensor2|intervalsValuesSensor1|
+--------+---------------+---------+------------+---------+------------+----------------------+
|20210113|10:01:01.000000| 171| 5| 173| 8| []|
|20210113|10:05:05.111111| 171| 6| 172| 8| [5]|
|20210113|10:08:08.222222| 171| 4| 172| 8| [5, 6]|
|20210113|10:10:10.333333| 171| 6| 172| 8| [5, 6, 4]|
|20210113|10:15:15.444444| 171| 8| 172| 10| [4, 6]|
|20210113|10:18:18.555555| 171| 9| 172| 10| [6, 8]|
+--------+---------------+---------+------------+---------+------------+----------------------+
对于Sensor2
也可以这样做。对于求和,可以用F.collect_list(F.col('valueSensor1') + F.col('valueSensor2'))