正在从本地Dask转换到群集



我有一个简单而尴尬的并行程序,我正在Dask上成功地本地运行。耶!现在我想把它移到一个集群中,并扩大问题的规模。在这种情况下,我使用的是GCP。我尝试了两种方法,GCPCluster()HelmCluster(),每种方法都提供了不同的失败路径。(我以前已经成功地实例化了GCE计算。因此,我们可以假设我已经解决了所有的安全/登录凭据。网络可能是另一回事。(以下是主要例程:

from dask import delayed
from dask.distributed import Client, wait, as_completed, LocalCluster
from dask_kubernetes import HelmCluster
from dask_cloudprovider.gcp import GCPCluster
from problem.loop import inner_loop
from problem.problemSpec import problemInit
# gRange = 99
gRange = 12

def phase_transition(client: Client):
p = problemInit()
m = p.m
loop = delayed(inner_loop)
loops = [loop(int(m[i])) for i in range(gRange)]
# visualize(loops, filename='delayed_results', format='svg')
futures = client.compute(loops)
wait(futures)
for future, result in as_completed(futures, with_results=True):
print(result)

if __name__ == "__main__":
# with LocalCluster(dashboard_address='localhost:8787') as cluster:
with GCPCluster(projectid='random-words-654321', machine_type='n1-standard-4', n_workers=2) as cluster:
with Client(cluster) as client:
phase_transition(client)

当使用GCPCluster()时,系统等待来自调度器的响应。以下是日志消息:

Launching cluster with the following configuration: 
Source Image: projects/ubuntu-os-cloud/global/images/ubuntu-minimal-1804-bionic-v20201014 
Docker Image: daskdev/dask:latest 
Machine Type: n1-standard-4 
Filesytsem Size: 50 
Disk Type: pd-standard 
N-GPU Type:  
Zone: us-east1-c 
Creating scheduler instance
dask-837e1ad1-scheduler
Internal IP: 10.142.0.4
External IP: 35.237.42.13
Waiting for scheduler to run at 35.237.42.13:8786

scheduler系统已启动,我可以将SSH放入其中。看起来有网络问题。(顺便说一句,我正在使用类似于daskdev/dask:latest调用的Conda映像从PyCharm运行此程序。(显然,我们甚至还没有开始在云上安装本地代码。

这是Dask和GCP的经验将解决的某种问题,我还没有这种经验。因此,请允许我在文档中尝试不同的路径,并启动由Helm管理的k8s集群。我的代码唯一的变化是:

if __name__ == "__main__":
cluster = HelmCluster(release_name='gke-dask')
with Client(cluster) as client:
phase_transition(client)

这跑得好多了。它现在在本地机器的子目录problem中查找代码时遇到问题。以下是日志:

Forwarding from 127.0.0.1:65410 -> 8786
Forwarding from [::1]:65410 -> 8786
Handling connection for 65410
Handling connection for 65410
/Users/awd/opt/anaconda3/envs/dask-cvxpy/lib/python3.8/site-packages/distributed/client.py:1140: VersionMismatchWarning: Mismatched versions found
+---------+---------------+---------------+---------------+
| Package | client        | scheduler     | workers       |
+---------+---------------+---------------+---------------+
| blosc   | None          | 1.9.2         | 1.9.2         |
| lz4     | 3.1.3         | 3.1.1         | 3.1.1         |
| msgpack | 1.0.2         | 1.0.0         | 1.0.0         |
| numpy   | 1.20.2        | 1.18.1        | 1.18.1        |
| python  | 3.8.8.final.0 | 3.8.0.final.0 | 3.8.0.final.0 |
+---------+---------------+---------------+---------------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6
warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
Handling connection for 65410
Handling connection for 65410
Handling connection for 65410
Traceback (most recent call last):
File "<input>", line 1, in <module>
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", line 197, in runfile
pydev_imports.execfile(filename, global_vars, local_vars)  # execute the script
File "/Applications/PyCharm.app/Contents/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"n", file, 'exec'), glob, loc)
File "/Users/awd/Projects/Stats285/ExamplePhaseTransition/main_func.py", line 39, in <module>
phase_transition(client)
File "/Users/awd/Projects/Stats285/ExamplePhaseTransition/main_func.py", line 28, in phase_transition
for future, result in as_completed(futures, with_results=True):
File "/Users/awd/opt/anaconda3/envs/dask-cvxpy/lib/python3.8/site-packages/distributed/client.py", line 4336, in __next__
return self._get_and_raise()
File "/Users/awd/opt/anaconda3/envs/dask-cvxpy/lib/python3.8/site-packages/distributed/client.py", line 4327, in _get_and_raise
raise exc.with_traceback(tb)
File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
ModuleNotFoundError: No module named 'problem'

在实践中,我正在寻求任何一个问题的帮助。我有点偏爱GCPCluster()解决方案。

Fargate也有同样的问题。它适用于本地,但不适用于AWS fargate:

File "/opt/conda/lib/python3.8/site-packages/distributed/protocol/pickle.py", line 75, in loads
return pickle.loads(x)
ModuleNotFoundError: No module named 'userActivity'```
It's apparently linked to a Pythonpath mismatch between client and workers

最新更新