使用Dask进行基于日期时间索引的切片



我有两个数据帧:links有两个日期时间列,分别称为onsetoffset,每一行都是一个事件。另一个数据帧被称为sensors,用频率为1m的日期时间索引进行索引,并且有大约600列,每个列对应一个sensor-id。本质上,对于每一个links行,我想使用起始值和偏移值作为时间范围来从sensors中分割相关行,通过获得平均值来跨行聚合它们,然后将单行平均传感器值水平连接到links数据帧。我已经通过下面的Pandas代码做到了这一点,它很有效,但我有很多数据,而且速度非常慢。

def search_sensors(sensors, start_time, stop_time):
s = sensors[start_time: stop_time]
s = s.mean()
return s
# add column names of sensor-ids
links[sensors.columns] = None
for index, row in links.iterrows():
start_time = row['start_time']
stop_time = row['stop_time']
mean_sensors = search_sensors(sensors, start_time, stop_time)
links.iloc[index, sensors.columns] = mean_sensors.to_list()

我已经和达斯克试过一些东西,但没有运气。

  1. 使用dask.delayed()和Pandas,我得到一个UserWarning: Large object of size 35.62 MiB detected in task graph:
mean_sensors_list = []
for index, row in links.iterrows():
start_time = row['start_time']
stop_time = row['stop_time']
mean_sensors = dask.delayed(search_sensors)(sensors, start_time, stop_time)
links_list.append(mean_sensors)   # mean_sensors is delayed object containing a pandas.Series of shape (600, nan) 
results = dask.compute(*mean_sensors_list)
  1. dask.dataframe()与以下代码一起使用与Pandas一样慢,而且我在Dask仪表板中没有看到任何并行化指示
sensors_dd = dd.from_pandas(sensors_interp, npartitions=1)
links_dd = dd.from_pandas(links, npartitions=1)
mean_sensors_list = []
for index, row in links_dd.iterrows():
start_time = row['start_time']
stop_time = row['stop_time']
mean_sensors = search_sensors(sensors_dd, start_time, stop_time)
links_list.append(mean_sensors)   # mean_sensors is a dask.Series of shape (600, nan)
results = dask.compute(*mean_sensors_list)
  1. 使用1和2,即mean_sensors = dask.delayed(search_sensors)(sensors_dd, start_time, stop_time),mean_sensors是一个包含dask的延迟对象。系列造型(600,nan(但是执行起来很慢。Dashboard显示了3个任务(search_sensors、finalize、from_padas(的一些并行化,4个工作线程的CPU使用率非常低。另外,当我运行Ubuntu时,它会显示磁盘空间不足的消息

Dask新手,我不熟悉CCD_13。问题的解决方案如下:

PD_5在我的4核笔记本电脑中速度极快。

最新更新