参考资料:
- https://examples.dask.org/applications/forecasting-with-prophet.html?highlight=prophet
- https://facebook.github.io/prophet/
需要注意的几点:
-
我总共有 48gb 的内存
-
这是我使用的库版本
- 蟒蛇 3.7.7
- DASK==2.18.0
- 先知==0.6
- 熊猫==1.0.3
我进口熊猫的原因只是为了这条线pd.options.mode.chained_assignment = None
这有助于解决 im 使用 dask.distributed 时出现 dask 错误
所以,我有一个 21gb 的 csv 文件,我正在使用 dask 和 jupyter 笔记本读取它...... 我试图从我的 mysql 数据库表中读取它,但是,内核最终崩溃了
我已经尝试了使用本地工作线程网络、线程和可用内存、可用storage_memory的多种组合,甚至尝试根本不使用distributed
。我也尝试过用熊猫分块(不是上面提到的与熊猫相关的行(,但是,即使使用分块,内核仍然崩溃......
我现在可以使用 dask 加载 csv,并应用一些转换,例如设置索引、添加 fbprophet 需要的列(名称(......但我仍然无法使用df.compute()
计算数据帧,因为这就是为什么我认为我收到了 fbprophet 的错误。在我添加了带有适当 dtypes 的列 y 和 ds 后,我收到了错误Truth of Delayed objects is not supported
,我认为这是因为 fbprophet 希望数据帧不会懒惰,这就是我尝试事先运行计算的原因。我还提高了客户端上的 ram 以允许它使用完整的 48gb,因为我怀疑它可能尝试加载数据两次,但是,这仍然失败,所以很可能情况并非如此/没有导致问题。
除此之外,dask 文档中还提到了 fbpropphet 用于将机器学习应用于数据帧,但是,我真的不明白为什么这不起作用...... 我也用射线和dask尝试过莫丁,结果基本相同。
另一个问题...关于内存使用情况distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 32.35 GB -- Worker memory limit: 25.00 GB
我在分配客户端、读取 csv 文件以及将操作/转换应用于数据帧时收到此错误,但是分配的大小大于 csv 文件本身,所以这让我感到困惑......
我自己做了什么来尝试解决这个问题: - 谷歌搜索当然,没有找到任何东西:-/ - 多次询问不和谐的帮助渠道 - 多次向IIRC寻求帮助渠道
无论如何,非常感谢对此问题的任何帮助!! 提前谢谢你:)
MCVE
from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
from fbprophet import Prophet
pd.options.mode.chained_assignment = None
client = Client(n_workers=2, threads_per_worker=4, processes=False, memory_limit='4GB')
csv_file = 'provide_your_own_csv_file_here.csv'
df = dd.read_csv(csv_file, parse_dates=['Time (UTC)'])
df = df.set_index('Time (UTC)')
df['y'] = df[['a','b']].mean(axis=1)
m = Prophet(daily_seasonality=True)
m.fit(df)
# ERROR: Truth of Delayed objects is not supported
不幸的是,Prophet 目前不支持 Dask 数据帧。
您引用的示例显示了使用 Dask 加速 Prophet 在 Pandas 数据帧上的拟合。 Dask 数据帧只是人们使用 Dask 的一种方式。
如前所述,一种方法是将dask.delayed
与熊猫一起使用DataFrame
,然后跳过dask.dataframe
。
您可以使用简化版本的load
-clean
-analyze
管道,用于使用 Dask 的自定义计算。
以下是基于这种类型的自定义管道的一种可能方法,使用小型数据集(创建 MCVE( - 管道中的每个步骤都将延迟
进口
import numpy as np
import pandas as pd
from dask import delayed
from dask.distributed import Client
from fbprophet import Prophet
在.csv
中生成一些数据,列名Time (UTC)
、a
和b
def generate_csv(nrows, fname):
df = pd.DataFrame(np.random.rand(nrows, 2), columns=["a", "b"])
df["Time (UTC)"] = pd.date_range(start="1850-01-01", periods=nrows)
df.to_csv(fname, index=False)
首先从管道中写入load
函数,使用 Pandas 加载.csv
,并使用dask.delayed
装饰器延迟其执行
- 最好将
read_csv
与nrows
一起使用,以查看管道对数据子集的执行情况,而不是全部加载 - 这将返回一个
dask.delayed
对象,而不是一个pandas.DataFrame
@delayed
def load_data(fname, nrows=None):
return pd.read_csv(fname, nrows=nrows)
现在创建process
函数,以使用pandas
处理数据,再次延迟,因为它的输入是dask.delayed
对象而不是pandas.DataFrame
@delayed
def process_data(df):
df = df.rename(columns={"Time (UTC)": "ds"})
df["y"] = df[["a", "b"]].mean(axis=1)
return df
最后一个函数 - 这个函数将训练fbprophet
数据(从.csv
加载并处理,但延迟(以进行预测。此analyze
函数也会延迟,因为它的输入之一是dask.delayed
对象
@delayed
def analyze(df, horizon):
m = Prophet(daily_seasonality=True)
m.fit(df)
future = m.make_future_dataframe(periods=horizon)
forecast = m.predict(future)
return forecast
运行管道(如果从 Python 脚本运行,则需要__name__ == "__main__"
- 管道的输出(
fbprophet
预测(存储在一个变result
中,该变是延迟的- 当计算此输出时,这将生成一个
pandas.DataFrame
(对应于fbprophet
预测的输出(,因此可以使用result.compute()
进行评估
- 当计算此输出时,这将生成一个
if __name__ == "__main__":
horizon = 8
num_rows_data = 40
num_rows_to_load = 35
csv_fname = "my_file.csv"
generate_csv(num_rows_data, csv_fname)
client = Client() # modify this as required
df = load_data(csv_fname, nrows=num_rows_to_load)
df = process_data(df)
result = analyze(df, horizon)
forecast = result.compute()
client.close()
assert len(forecast) == num_rows_to_load + horizon
print(forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]].head())
输出
ds yhat yhat_lower yhat_upper
0 1850-01-01 0.330649 0.095788 0.573378
1 1850-01-02 0.493025 0.266692 0.724632
2 1850-01-03 0.573344 0.348953 0.822692
3 1850-01-04 0.491388 0.246458 0.712400
4 1850-01-05 0.307939 0.066030 0.548981