Dask分布式库给出序列化错误



我已经用10个工人和每个工人4个线程初始化了集群,我有12核笔记本电脑在运行这个。

cluster = makeIndividualDashboard.LocalCluster(n_workers=10, threads_per_worker=4)
client = makeIndividualDashboard.Client()
runOna(client)
client.shutdown()

下面是我进行集群计算的代码。

st = settings.as_dict()
new_settings = namedtuple("Settings", st.keys())(*st.values())
to_process = []
client.cluster.scale(10)
if mongoConnection:
mongo_c = True
else:
mongo_c = None
future = client.scatter([net, new_settings, avgNodesConnected, kcoreByGroup, averageTeamDensity,
edgesInByAttributeTableMeans, edgesInByAttributeTable, crossTeamTiesTable,
descendentLookup, groupDegreeTable, respondentDegreeTable, degreeTable,
orgTeamTree, teamMembership, graphId, selectionRange, criteria,
onlyForNodes, hashIds, useEnvironment, rollupToLeaders, averageTeamSize,
meanCrossInTiesPct, meanCrossOutTiesPct, meanCrossAllTiesPct, mongo_c])
for node in nodes:
if FILTER_FOR_USER == None or node == FILTER_FOR_USER:
to_process.append(dask.delayed(run_me)(node, *future))
dask.compute(*to_process)

是的,这看起来有点混乱,因为run_me是一个很大的函数,到目前为止,我不能在未来更好地模块化,也许我会的。问题是,如果我只有5个或更少的工作者,这项工作很好,但一旦我增加了工作者的数量,就会出现序列化错误。

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/core.py", line 44, in dumps
for key, value in data.items()
File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/core.py", line 45, in <dictcomp>
if type(value) is Serialize
File "/Users/omtripa/anaconda3/envs/ONA-Transformation/lib/python3.7/site-packages/distributed/protocol/serialize.py", line 210, in serialize
TypeError: ('Could not serialize object of type float64.', '0.68')
distributed.comm.utils - ERROR - ('Could not serialize object of type float64.', '0.68')

同样,这是非常奇怪的,因为如果我在Linux服务器上运行这个程序,它有35个核心,我把工作人员的数量放在30个,工作正常,不确定问题出在哪里。这是我当地特有的吗??我可以查找序列化问题,但为什么这只适用于5个工人??

提前感谢您的帮助。

错误表明您试图发送给工作进程的某个对象不可序列化。类型是float64,可能是numpy.float64对象?我真的不知道你说了什么。我已经验证了Dask在Numpy float64对象周围移动的效果很好

In [1]: from dask.distributed import Client                                                                                                                                                                                         
In [2]: client = Client()                                                                                                                                                                                                           
In [3]: import numpy as np                                                                                                                                                                                                          
In [4]: x = np.float64(1)                                                                                                                                                                                                           
In [5]: future = client.scatter(x)                                                                                                                                                                                                  
In [6]: future.result()                                                                                                                                                                                                             
Out[6]: 1.0

我鼓励您提供MCVE。看见https://stackoverflow.com/help/minimal-reproducible-example

最新更新