Python 多处理队列未正确并行化



我很难让我的代码正确并行运行。我需要的是以下内容:

  • 我有 4 个磁盘代码,我需要在一定时间内以离散的时间步长发展
  • 我想并行发展这些代码中的每一个

由于代码和我需要存储的数据的性质,在尝试进程池后,我决定最好的方法是使用队列。我尝试了两种不同的代码。对于此示例,disk_codes只是数字,但在我的实际代码中,它们是另一个代码的单个实例,用于求解磁盘中的运动方程。

代码 1:

import multiprocessing
try:
import queue
except:
import Queue as queue
def evolve_single_disk(queue, dt):
print "Empty queue? ", queue.empty()
code = queue.get()
print "Evolving disk {0} in {1}".format(code, multiprocessing.current_process().name)
queue.task_done()
if __name__ == '__main__':
ncores = 4
code_queue = queue.Queue()
disk_codes = range(ncores)
for disk in disk_codes:
code_queue.put(disk)
dt = 1
t_end = 10
t = 0
# Evolve!
while t < t_end:
print "t=", t
processes = multiprocessing.Process(target=evolve_single_disk, args=(code_queue, dt, ))
processes.start()
processes.join()
disk_codes = code_queue.get()
print "disk codes: ", disk_codes

t += dt

这导致:

t= 0
Empty queue?  False
Evolving disk 0 in Process-1
disk codes:  0
t= 1
Empty queue?  False
Evolving disk 1 in Process-2
disk codes:  1
t= 2
Empty queue?  False
Evolving disk 2 in Process-3
disk codes:  2
t= 3
Empty queue?  False
Evolving disk 3 in Process-4
disk codes:  3
t= 4
Empty queue?  True

因此,在每个时间步中,其中一个磁盘都是"进化"的。这不是我想要的,因为我希望所有四个磁盘在同一时间步中并行发展。

然后我试了这个:

代码 2:

import multiprocessing
try:
import queue
except:
import Queue as queue

def evolve_single_disk(queue, dt):
print "Empty queue? ", queue.empty()
code = queue.get()  
print "Evolving disk {0} in {1}".format(code, multiprocessing.current_process().name)
queue.task_done()

if __name__ == '__main__':
ncores = 4
code_queue = queue.Queue()
disk_codes = range(ncores)
for disk in disk_codes:
code_queue.put(disk)
dt = 1
t_end = 10
t = 0
# Evolve!
while t < t_end:
print ""
print "t=", t
processes = [multiprocessing.Process(target=evolve_single_disk, args=(code_queue, dt, )) for x in range(ncores)]
for p in processes:
p.start()
p.join()
disk_codes = [code_queue.get() for p in processes]
print "disk codes: ", disk_codes
t += dt

这导致:

t= 0
Empty queue?  False
Evolving disk 0 in Process-1
Empty queue?  False
Evolving disk 0 in Process-2
Empty queue?  False
Evolving disk 0 in Process-3
Empty queue?  False
Evolving disk 0 in Process-4
disk codes:  [0, 1, 2, 3]
t= 1
Empty queue?  True

。然后代码就挂起了。所以在这里,我在每个时间步中启动 4 个进程,但每个进程都接收完全相同的磁盘。

我怎样才能正确地做到这一点,以便在每个时间步上有 4 个进程,每个进程演变 1 个单个磁盘?我已经阅读了文档和许多教程以及SO问题/答案,但我仍然感到非常困惑。

编辑:

我尝试使用multiprocessing队列,但是当我尝试将磁盘代码放入队列时,我遇到了TypeError。可悲的是,磁盘代码也不可腌制。使用带有磁盘代码的队列multiprocessing回溯:

t= 0
<multiprocessing.queues.Queue object at 0x7fee4a2d3950>
Traceback (most recent call last):
File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
TypeError: expected string or Unicode object, NoneType found
Traceback (most recent call last):
File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
TypeError: expected string or Unicode object, NoneType found
Traceback (most recent call last):
File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
TypeError: expected string or Unicode object, NoneType found
Traceback (most recent call last):
File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
TypeError: expected string or Unicode object, NoneType found
Empty queue?  True

在while 循环中移动它:

for disk in disk_codes:
code_queue.put(disk)

以下是完整的代码:

import multiprocessing
def evolve_single_disk(queue, result, dt):
print "Empty queue? ", queue.empty()
code = queue.get()
print "Evolving disk {0} in {1}".format(code, multiprocessing.current_process().name)
result.put(code)

if __name__ == '__main__':
ncores = 4
code_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
disk_codes = range(ncores)
dt = 1
t_end = 10
t = 0
# Evolve!
while t < t_end:
for disk in disk_codes:
code_queue.put(disk)
print ""
print "t=", t
process_list = list()
for x in range(ncores):
process = multiprocessing.Process(target=evolve_single_disk, args=(code_queue, result_queue, dt))
process_list.append(process)
for p in process_list:
p.start()
p.join()
disk_codes = [result_queue.get() for p in process_list]
print "disk codes: ", disk_codes
t += dt

输出


t= 0
Empty queue?  False
Evolving disk 0 in Process-1
Empty queue?  False
Evolving disk 1 in Process-2
Empty queue?  False
Evolving disk 2 in Process-3
Empty queue?  False
Evolving disk 3 in Process-4
disk codes:  [0, 1, 2, 3]
t= 1
Empty queue?  False
Evolving disk 0 in Process-5
Empty queue?  False
Evolving disk 1 in Process-6
Empty queue?  False
Evolving disk 2 in Process-7
Empty queue?  False
Evolving disk 3 in Process-8
disk codes:  [0, 1, 2, 3]
...
t= 9
Empty queue?  False
Evolving disk 0 in Process-37
Empty queue?  False
Evolving disk 1 in Process-38
Empty queue?  False
Evolving disk 2 in Process-39
Empty queue?  False
Evolving disk 3 in Process-40
disk codes:  [0, 1, 2, 3]

相关内容

  • 没有找到相关文章