我正在研究一个使用历史数据和传入数据进行分析的项目。我想学习如何管理在任务上更新传入数据,而不必每次都调度所有的历史数据。
我收集用于分析的时间序列数据,但时间序列随着传入数据而增长,并且每个流的传入数据需要发送给适当的工作人员,以进行ARMA分析等操作。如果我对天气进行ARMA分析,我希望将气压与温度分开,并通过比较压力与压力和温度与温度来执行分析。我不想将新的温度数据附加到先前的温度数据中,并将现在更大的系列分派给新的工作人员。我希望只将新的温度数据发送给已经拥有所有先前温度数据的任务工作者,以此类推。如何确保先前的温度数据在工人上持续存在,以及如何将(仅)新的温度数据分派给具有先前数据的工人。
我已经用任务做了一些基本的事情,但是所有的基本课程都没有解决历史记录和方法在结果的仅工人持久性上的持久性。此外,这些数据不是基于任务系列或数据框架,而是基于包含与分析方法相关的不同数据和方法的类。所以我不能有效地使用任务系列或数据框架。
如有任何帮助,不胜感激
这可能不是正确的解决方案,但一种可能性是指定特定的工作者来执行特定的计算。例如,让我们将工人分成两组:
# instantiate workers
from distributed import Client
c = Client(n_workers=5)
# here the separation is done based on order
# but custom logic can be implemented instead
workers_pressure = list(c.scheduler_info()['workers'])[3:]
workers_temperature = list(c.scheduler_info()['workers'])[:3]
现在,对于与pressure
相关的任务,我们可以指定与pressure
相关的工作人员:
data_pressure = [4,5,6]
data_temperature = [1,2,3]
# scatter data to pressure/temperature workers
d_p = client.scatter(data_pressure, workers= workers_pressure)
d_t = client.scatter(data_temperature, workers=workers_temperature)
# submit computations to specific workers
function_pressure = lambda x: x**2
function_temperature = lambda x: x**2
f_p = client.map(function_pressure, d_p, workers=workers_pressure)
f_t = client.map(function_temperature, d_t, workers= workers_temperature)
在上面的代码片段中,指定处理压力数据的工作人员将用于运行压力计算。
如果你有一个非常异构的任务集,这将不能很好地扩展。如果这是你的情况,我会首先构建任务图(DAG),然后让dask
处理最有效的工人分配到任务。