无法使用 dask 将数据帧与 fbprophet 一起读取 csv 到数据帧中



参考资料:

  • https://examples.dask.org/applications/forecasting-with-prophet.html?highlight=prophet
  • https://facebook.github.io/prophet/


需要注意的几点:

  • 我总共有 48gb 的内存

  • 这是我使用的库版本

    • 蟒蛇 3.7.7
    • DASK==2.18.0
    • 先知==0.6
    • 熊猫==1.0.3

我进口熊猫的原因只是为了这条线
pd.options.mode.chained_assignment = None
这有助于解决 im 使用 dask.distributed 时出现 dask 错误

所以,我有一个 21gb 的 csv 文件,我正在使用 dask 和 jupyter 笔记本读取它...... 我试图从我的 mysql 数据库表中读取它,但是,内核最终崩溃了

我已经尝试了使用本地工作线程网络、线程和可用内存、可用storage_memory的多种组合,甚至尝试根本不使用distributed。我也尝试过用熊猫分块(不是上面提到的与熊猫相关的行(,但是,即使使用分块,内核仍然崩溃......

我现在可以使用 dask 加载 csv,并应用一些转换,例如设置索引、添加 fbprophet 需要的列(名称(......但我仍然无法使用df.compute()计算数据帧,因为这就是为什么我认为我收到了 fbprophet 的错误。在我添加了带有适当 dtypes 的列 y 和 ds 后,我收到了错误Truth of Delayed objects is not supported,我认为这是因为 fbprophet 希望数据帧不会懒惰,这就是我尝试事先运行计算的原因。我还提高了客户端上的 ram 以允许它使用完整的 48gb,因为我怀疑它可能尝试加载数据两次,但是,这仍然失败,所以很可能情况并非如此/没有导致问题。

除此之外,dask 文档中还提到了 fbpropphet 用于将机器学习应用于数据帧,但是,我真的不明白为什么这不起作用...... 我也用射线和dask尝试过莫丁,结果基本相同。

另一个问题...关于内存使用情况distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 32.35 GB -- Worker memory limit: 25.00 GB我在分配客户端、读取 csv 文件以及将操作/转换应用于数据帧时收到此错误,但是分配的大小大于 csv 文件本身,所以这让我感到困惑......

我自己做了什么来尝试解决这个问题: - 谷歌搜索当然,没有找到任何东西:-/ - 多次询问不和谐的帮助渠道 - 多次向IIRC寻求帮助渠道

无论如何,非常感谢对此问题的任何帮助!! 提前谢谢你:)

MCVE

from dask.distributed import Client
import dask.dataframe as dd
import pandas as pd
from fbprophet import Prophet
pd.options.mode.chained_assignment = None
client = Client(n_workers=2, threads_per_worker=4, processes=False, memory_limit='4GB')
csv_file = 'provide_your_own_csv_file_here.csv'
df = dd.read_csv(csv_file, parse_dates=['Time (UTC)'])
df = df.set_index('Time (UTC)')
df['y'] = df[['a','b']].mean(axis=1)
m = Prophet(daily_seasonality=True)
m.fit(df)
# ERROR: Truth of Delayed objects is not supported

不幸的是,Prophet 目前不支持 Dask 数据帧。

您引用的示例显示了使用 Dask 加速 Prophet 在 Pandas 数据帧上的拟合。 Dask 数据帧只是人们使用 Dask 的一种方式。

如前所述,一种方法是将dask.delayed与熊猫一起使用DataFrame,然后跳过dask.dataframe

您可以使用简化版本的load-clean-analyze管道,用于使用 Dask 的自定义计算。

以下是基于这种类型的自定义管道的一种可能方法,使用小型数据集(创建 MCVE( - 管道中的每个步骤都将延迟

进口

import numpy as np
import pandas as pd
from dask import delayed
from dask.distributed import Client
from fbprophet import Prophet

.csv中生成一些数据,列名Time (UTC)ab

def generate_csv(nrows, fname):
df = pd.DataFrame(np.random.rand(nrows, 2), columns=["a", "b"])
df["Time (UTC)"] = pd.date_range(start="1850-01-01", periods=nrows)
df.to_csv(fname, index=False)

首先从管道中写入load函数,使用 Pandas 加载.csv,并使用dask.delayed装饰器延迟其执行

  • 最好将read_csvnrows一起使用,以查看管道对数据子集的执行情况,而不是全部加载
  • 这将返回一个dask.delayed对象,而不是一个pandas.DataFrame
@delayed
def load_data(fname, nrows=None):
return pd.read_csv(fname, nrows=nrows)

现在创建process函数,以使用pandas处理数据,再次延迟,因为它的输入是dask.delayed对象而不是pandas.DataFrame

@delayed
def process_data(df):
df = df.rename(columns={"Time (UTC)": "ds"})
df["y"] = df[["a", "b"]].mean(axis=1)
return df

最后一个函数 - 这个函数将训练fbprophet数据(从.csv加载并处理,但延迟(以进行预测。此analyze函数也会延迟,因为它的输入之一是dask.delayed对象

@delayed
def analyze(df, horizon):
m = Prophet(daily_seasonality=True)
m.fit(df)
future = m.make_future_dataframe(periods=horizon)
forecast = m.predict(future)
return forecast

运行管道(如果从 Python 脚本运行,则需要__name__ == "__main__"

(
  • 管道的输出(fbprophet预测(存储在一个变result中,该变是延迟的
    • 当计算此输出时,这将生成一个pandas.DataFrame(对应于fbprophet预测的输出(,因此可以使用result.compute()
    • 进行评估
if __name__ == "__main__":
horizon = 8
num_rows_data = 40
num_rows_to_load = 35
csv_fname = "my_file.csv"
generate_csv(num_rows_data, csv_fname)
client = Client()  # modify this as required
df = load_data(csv_fname, nrows=num_rows_to_load)
df = process_data(df)
result = analyze(df, horizon)
forecast = result.compute()
client.close()
assert len(forecast) == num_rows_to_load + horizon
print(forecast[["ds", "yhat", "yhat_lower", "yhat_upper"]].head())

输出

ds      yhat  yhat_lower  yhat_upper
0 1850-01-01  0.330649    0.095788    0.573378
1 1850-01-02  0.493025    0.266692    0.724632
2 1850-01-03  0.573344    0.348953    0.822692
3 1850-01-04  0.491388    0.246458    0.712400
4 1850-01-05  0.307939    0.066030    0.548981

最新更新