了解与数据帧操作相关的 Dask 分布式行为



我想更好地了解dask.distributed是如何工作的。我有一个简单的csv,我读入Dask数据帧,如下所示。此操作执行良好,并返回一些表示数据帧长度的整数值,这是我所期望的行为。

import dask.dataframe as dd
gdf = dd.read_csv(filepath)
len(gdf)
# returns some int value

但是一旦我从dask.distributed引入Client实例,我会收到以下错误:

distributed.utils - ERROR - 'LocalFileSystem' object has no attribute 'cwd'

下面是一个示例代码块:

from dask.distributed import Client
import dask.dataframe as dd
client_db = Client(remote_addr)
gdf = dd.read_csv(filepath)
len(gdf)
# throws the above error

我很困惑 - 一旦实例化,Client是否将自身"注入"到所有 Dask 操作中。我认为我需要做一些类似gdf = client_db.persist(gdf)的事情来请求该Client连接来管理该数据帧上的操作。

关于这里发生的事情的一些背景将不胜感激!我从回溯中看到它与 Tornado 有关,Tornado 是 Py 中的一个 Web 框架,允许 Web 套接字、长轮询等。我假设它正在尝试存储一些东西...地方。。。但我的熟悉感在这里消失了。

如果需要,回溯:

Traceback (most recent call last):
File "/.../geopandas_opt/venv/lib/python3.6/site-packages/distributed/utils.py", line 223, in f
result[0] = yield make_coro()
File "/.../geopandas_opt/venv/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/.../geopandas_opt/venv/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/.../geopandas_opt/venv/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/.../geopandas_opt/venv/lib/python3.6/site-packages/distributed/client.py", line 1156, in _gather
traceback)
File "/.../geopandas_opt/venv/lib/python3.6/site-packages/six.py", line 685, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.5/site-packages/dask/bytes/core.py", line 212, in read_block_from_file
File "/usr/local/lib/python3.5/site-packages/dask/bytes/core.py", line 314, in __enter__
File "/usr/local/lib/python3.5/site-packages/dask/bytes/local.py", line 64, in open
File "/usr/local/lib/python3.5/site-packages/dask/bytes/local.py", line 36, in _trim_filename
AttributeError: 'LocalFileSystem' object has no attribute 'cwd'

是的,当您创建客户端时,它会将自身注册为默认的全局调度程序。 您可以使用set_as_default=关键字避免此行为

client = Client(..., set_as_default=False)

关于您遇到的异常,我怀疑这是版本不匹配。 您可能希望使用condapip进行升级。

conda install dask distributed

pip install dask distributed

最新更新