我刚刚学会了python多处理。我想创建一个模型来模拟在网络中发送和接收消息的过程。有向图描述两个节点之间的关系,字典描述两个节点之间的通信。此字典的值的数据类型为队列。但我遇到了一些错误:
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
PoolGroup=[('R1','R2','R3'),('N1','N2','N3'),('E1','E2','E3')]
PoolElement=['R1','R2','R3','N1','N2','N3','E1','E2','E3']
graph={'R1':['N1','N2','N3'],
'R2':['N1','N2','N3'],
'R3':['N1','N2','N3'],
'N1':['E1','E2','E3'],
'N2':['E1','E2','E3'],
'N3':['E1','E2','E3'],
'E1':[],
'E2':[],
'E3':[]}
def addSigal(target,information):
AllQueue[target].put(information)
print("Succeed in sending msg to "+target)
print(target+' now has ',AllQueue[target].qsize(),' signals')
def pool1function(name,information):
targetlist=list(graph[name])
print(name+" send information to "+str(targetlist))
with ProcessPoolExecutor() as pool1:
pool1.map(addSigal,targetlist,[information]*3)
if __name__=='__main__':
m=Manager()
AllQueue=m.dict()
AllQueue.update({PE:m.Queue() for PE in PoolElement})
with ProcessPoolExecutor() as pool:
pool.map(pool1function,PoolGroup[0],[1,2,3])
不幸的是,结果刚刚显示:
R1 send information to ['N1', 'N2', 'N3']
R2 send information to ['N1', 'N2', 'N3']
R3 send information to ['N1', 'N2', 'N3']
这意味着信息不会发送到相应的节点。所以我检查了AllQueue,发现了一些奇怪的东西:当我打印AllQueue['R1']时,它显示:
RemoteError:
---------------------------------------------------------------------------
Unserializable message: ('#RETURN', <queue.Queue object at 0x10edd8dd8>)
---------------------------------------------------------------------------
我也未能从 AllQueue['R1'] 放置或获取元素,有什么问题?
这是将字典传递给任务的示例:
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
def addSigal(target, information, q):
print(target,information,q)
q[target]=(information)
print("Succeed in sending msg to "+target)
print(target+' now has ',q[target])
if __name__=='__main__':
m = Manager()
AllQueue = m.dict()
AllQueue.update({'A':0,'B':1})
with ProcessPoolExecutor() as pool:
pool.map(addSigal,'AB', [1, 2],[AllQueue,AllQueue])
print(AllQueue)