使用管道在池中的工作线程之间进行通信



我想使用管道在从多处理池生成的工作进程之间进行通信。 我正在尝试将管道作为iterable传递给池,但代码始终挂起。

这是挂起的代码。它非常简单,实际上甚至不使用管道(尽管它们被传递给 worker 函数)。

import os
import multiprocessing as mp
from multiprocessing import Pool
def worker(d):
j,p = d      # Notice that p (a pipe) is never used!)
pid = os.getpid()
msg = "Greetings from job {} ({})".format(j,pid)
print(msg)
return (j,pid)
# Main program
np = 4
pipes_0,pipes_1 = zip(*[mp.Pipe() for i in range(np)])
data = zip(range(np),pipes_1)  # Doesn't work (even though pipes not used)
# data = zip(range(np),range(np))  # Works
pool = Pool(processes=np)
results_async = pool.map_async(func=worker, iterable=data)
results = results_async.get()
print(results)

当管道作为压缩可迭代对象的一部分传递时,输出通常挂起以下输出:

Greetings from job 0 (35824)
Greetings from job 1 (35825)
(code usually hangs here....)

奇怪的是,我没有使用上述代码中的管道,因此似乎池内正在发生一些事情,它期望从管道中获取某些东西。

如果我不包括管道作为传递给worker的数据的一部分,(使用data的注释定义)代码按预期工作并产生

Greetings from job 0 (35865)
Greetings from job 1 (35866)
Greetings from job 2 (35867)
Greetings from job 3 (35868)
[(0, 35865), (1, 35866), (2, 35867), (3, 35868)]

作为比较点,在这两种情况下,显式分叉进程(使用mp.Process而不是池)的类似代码都按预期工作。

实际上,此代码使用函数内部的管道,并且运行良好。

import os
import multiprocessing as mp
def worker(d):
j,p = d
pid = os.getpid()
p.send("Greetings from job {} ({})".format(j,pid))
# Main program
np = 4
pipes_0,pipes_1 = zip(*[mp.Pipe() for i in range(np)])
data = zip(range(np),pipes_1)
jobs = []
for d in data:
p = mp.Process(target=worker,args=[d])
p.start()
jobs.append(p)
for p0 in pipes_0:
print("{:s}".format(p0.recv()))
for j in jobs:
j.join()
print("Done")

生成预期的输出。

Greetings from job 0 (35834)
Greetings from job 1 (35835)
Greetings from job 2 (35836)
Greetings from job 3 (35837)
Done

最初,我以为 通过显式启动进程,我很幸运地避免了任何死锁,并且池使用的更复杂的执行计划在启动作业时引入了足够的滞后,从而导致死锁。

但这并不能解释为什么池代码不起作用,即使根本没有引用管道也是如此。

我正在运行 OSX 10.13.2, Python 3.6.3 |蟒蛇自定义(64位)|

任何见解都会非常有帮助!

不是和这里一样吗? 将管道/连接作为上下文参数传递给多处理 Pool.apply_async()

我猜您没有收到那里提到的错误消息,因为Mac OS。

链接中的答案说这是Python 2的错误。我用Python 3尝试了你的代码,它有效。

这个问题是早期 Python 2.x 版本中的一个错误,并且已经有几篇关于这个问题的文章。 不过,据说该错误在Python 3.3中得到了修复。 但是,我在OSX上运行Python 3.6,并且我的代码挂起。

作为比较,我运行了此处发布的代码 结果是相似的。 在该帖子的第二个代码中,队列作为参数传递给池工作线程。 这段代码在Linux(Anaconda 3.5)和OSX(Anaconda 3.6)上都挂在了我身上。

奇怪的是,我的代码运行在Linux版本的Anaconda上。 管道好,队列坏?

我开始喜欢游泳池了。

相关内容

  • 没有找到相关文章

最新更新