到Dask后端的joblib连接:tornado.iostream.StreamClosedError:流已关闭



我正在我的dask worker上运行一个简单的程序。下面是程序。

import numpy as np
from dask.distributed import Client

import joblib
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC
client = Client('127.0.0.1:30006', timeout=10000)
client.get_versions(check=True)
import pandas as pd
digits = load_digits()
param_space = {
'C': np.logspace(-6, 6, 13),
'gamma': np.logspace(-8, 8, 17),
'tol': np.logspace(-4, -1, 4),
'class_weight': [None, 'balanced'],
}
model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=3, n_iter=50, verbose=10)

with joblib.parallel_backend('dask'): #Running it on dask worker
search.fit(digits.data, digits.target)

30006是我运行调度程序的端口。

我得到以下错误。

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x000001DC4E701850>>, <Task finished name='Task-42' coro=<DaskDistributedBackend.apply_async.<locals>.f() done, defined at C:UsersUserDocumentscodecondavirtualenvlibsite-packagesjoblib_dask.py:316> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
File "C:UsersUserDocumentscodecondavirtualenvlibsite-packagesdistributedcommtcp.py", line 187, in read
n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:UsersUserDocumentscodecondavirtualenvlibsite-packagestornadoioloop.py", line 741, in _run_callback
ret = callback()
File "C:UsersUserDocumentscodecondavirtualenvlibsite-packagestornadoioloop.py", line 765, in _discard_future_result
future.result()
File "C:UsersUserDocumentscodecondavirtualenvlibsite-packagesjoblib_dask.py", line 317, in f
batch, tasks = await self._to_func_args(func)
File "C:UsersUserDocumentscodecondavirtualenvlibsite-packagesjoblib_dask.py", line 306, in _to_func_args
await maybe_to_futures(kwargs.values())))
File "C:UsersUserDocumentscodecondavirtualenvlibsite-packagesjoblib_dask.py", line 289, in maybe_to_futures
[f] = await self.client.scatter(
File "C:UsersUserDocumentscodecondavirtualenvlibsite-packagesdistributedclient.py", line 2084, in _scatter
await self.scheduler.scatter(
File "C:UsersUserDocumentscodecondavirtualenvlibsite-packagesdistributedcore.py", line 852, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "C:UsersUserDocumentscodecondavirtualenvlibsite-packagesdistributedcore.py", line 635, in send_recv
response = await comm.read(deserializers=deserializers)
File "C:UsersUserDocumentscodecondavirtualenvlibsite-packagesdistributedcommtcp.py", line 202, in read
convert_stream_closed_error(self, e)
File "C:UsersUserDocumentscodecondavirtualenvlibsite-packagesdistributedcommtcp.py", line 126, in convert_stream_closed_error
raise CommClosedError("in %s: %s" % (obj, exc)) from exc
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x000001DC4E701850>>, <Task finished name='Task-44' coro=<DaskDistributedBackend.apply_async.<locals>.f() done, defined at C:UsersUserDocumentscodecondavirtualenvlibsite-packagesjoblib_dask.py:316> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
File "C:UsersUserDocumentscodecondavirtualenvlibsite-packagesdistributedcommtcp.py", line 187, in read
n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

以下是我的包裹信息:

{
"scheduler": {
"host": {
"python": "3.8.0.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "5.4.72-microsoft-standard-WSL2",
"machine": "x86_64",
"processor": "",
"byteorder": "little",
"LC_ALL": "C.UTF-8",
"LANG": "C.UTF-8"
},
"packages": {
"python": "3.8.0.final.0",
"dask": "2021.01.0",
"distributed": "2021.01.0",
"msgpack": "1.0.0",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.18.1",
"lz4": "3.1.1",
"blosc": "1.9.2"
}
},
"workers": {
"tcp://10.1.1.92:37435": {
"host": {
"python": "3.8.0.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "5.4.72-microsoft-standard-WSL2",
"machine": "x86_64",
"processor": "",
"byteorder": "little",
"LC_ALL": "C.UTF-8",
"LANG": "C.UTF-8"
},
"packages": {
"python": "3.8.0.final.0",
"dask": "2021.01.0",
"distributed": "2021.01.0",
"msgpack": "1.0.0",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.18.1",
"lz4": "3.1.1",
"blosc": "1.9.2"
}
},
"tcp://10.1.1.93:45855": {
"host": {
"python": "3.8.0.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "5.4.72-microsoft-standard-WSL2",
"machine": "x86_64",
"processor": "",
"byteorder": "little",
"LC_ALL": "C.UTF-8",
"LANG": "C.UTF-8"
},
"packages": {
"python": "3.8.0.final.0",
"dask": "2021.01.0",
"distributed": "2021.01.0",
"msgpack": "1.0.0",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.18.1",
"lz4": "3.1.1",
"blosc": "1.9.2"
}
},
"tcp://10.1.1.94:36523": {
"host": {
"python": "3.8.0.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "5.4.72-microsoft-standard-WSL2",
"machine": "x86_64",
"processor": "",
"byteorder": "little",
"LC_ALL": "C.UTF-8",
"LANG": "C.UTF-8"
},
"packages": {
"python": "3.8.0.final.0",
"dask": "2021.01.0",
"distributed": "2021.01.0",
"msgpack": "1.0.0",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.18.1",
"lz4": "3.1.1",
"blosc": "1.9.2"
}
}
},
"client": {
"host": {
"python": "3.8.6.final.0",
"python-bits": 64,
"OS": "Windows",
"OS-release": "10",
"machine": "AMD64",
"processor": "Intel64 Family 6 Model 142 Stepping 10, GenuineIntel",
"byteorder": "little",
"LC_ALL": "None",
"LANG": "None"
},
"packages": {
"python": "3.8.6.final.0",
"dask": "2021.01.0",
"distributed": "2021.01.0",
"msgpack": "1.0.0",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.18.1",
"lz4": "None",
"blosc": "None"
}
}
}

我怀疑问题出在joblib上,因为如果我在没有行"的情况下运行它;用joblib.palallel_backend('dask'(:"fit命令运行良好。此外,我在dask worker上尝试了一个简单的numpy数组计算,它很有效。所以dask工作人员和我的客户的连接工作得很好。我尝试过不同版本的joblib。(0.16.0.0.17.0、1.0.0、1.0.1(,并且相同的错误仍然存在。

问题出在worker和客户端中运行的库版本不同。我从工作人员那里做了一个pip列表,并在客户端Docker文件上安装了所有带有特定版本的库。现在它正在发挥作用。我能够做适合dask工人

相关内容

  • 没有找到相关文章

最新更新