我想更好地了解dask.distributed
是如何工作的。我有一个简单的csv,我读入Dask数据帧,如下所示。此操作执行良好,并返回一些表示数据帧长度的整数值,这是我所期望的行为。
import dask.dataframe as dd
gdf = dd.read_csv(filepath)
len(gdf)
# returns some int value
但是一旦我从dask.distributed
引入Client
实例,我会收到以下错误:
distributed.utils - ERROR - 'LocalFileSystem' object has no attribute 'cwd'
下面是一个示例代码块:
from dask.distributed import Client
import dask.dataframe as dd
client_db = Client(remote_addr)
gdf = dd.read_csv(filepath)
len(gdf)
# throws the above error
我很困惑 - 一旦实例化,Client
是否将自身"注入"到所有 Dask 操作中。我认为我需要做一些类似gdf = client_db.persist(gdf)
的事情来请求该Client
连接来管理该数据帧上的操作。
关于这里发生的事情的一些背景将不胜感激!我从回溯中看到它与 Tornado 有关,Tornado 是 Py 中的一个 Web 框架,允许 Web 套接字、长轮询等。我假设它正在尝试存储一些东西...地方。。。但我的熟悉感在这里消失了。
如果需要,回溯:
Traceback (most recent call last):
File "/.../geopandas_opt/venv/lib/python3.6/site-packages/distributed/utils.py", line 223, in f
result[0] = yield make_coro()
File "/.../geopandas_opt/venv/lib/python3.6/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/.../geopandas_opt/venv/lib/python3.6/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/.../geopandas_opt/venv/lib/python3.6/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "/.../geopandas_opt/venv/lib/python3.6/site-packages/distributed/client.py", line 1156, in _gather
traceback)
File "/.../geopandas_opt/venv/lib/python3.6/site-packages/six.py", line 685, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.5/site-packages/dask/bytes/core.py", line 212, in read_block_from_file
File "/usr/local/lib/python3.5/site-packages/dask/bytes/core.py", line 314, in __enter__
File "/usr/local/lib/python3.5/site-packages/dask/bytes/local.py", line 64, in open
File "/usr/local/lib/python3.5/site-packages/dask/bytes/local.py", line 36, in _trim_filename
AttributeError: 'LocalFileSystem' object has no attribute 'cwd'
是的,当您创建客户端时,它会将自身注册为默认的全局调度程序。 您可以使用set_as_default=
关键字避免此行为
client = Client(..., set_as_default=False)
关于您遇到的异常,我怀疑这是版本不匹配。 您可能希望使用conda
或pip
进行升级。
conda install dask distributed
或
pip install dask distributed