我正在尝试多处理,我想在Python的主线程和带有队列的子进程之间进行通信。下面是我写的一个快速测试代码,它应该会定期得到子流程生成的一些结果:
from multiprocessing import Process, Queue
import time
def calculate(queue):
n = 0
while n < 10:
n += 1
queue.put(n)
time.sleep(1)
queue.put(0)
def queue_getter(queue):
executing = True
while executing:
while queue.qsize():
n = queue.get()
print(n)
if n == 0:
executing = False
time.sleep(0.1)
print('done')
queue = Queue()
p = Process(target=calculate, args=(queue,))
p.start()
queue_getter(queue)
p.join()
print('DONE')
这个程序只是永远挂起,而用threading.Thread
替换Process
会得到预期的结果:
1
2
3
4
5
6
7
8
9
10
0
done
DONE
在这种情况下,如何使进程的行为与线程相同?
您的程序在POSIX(类UNIX(系统上运行良好。
然而,为了让它在ms windows和macOS上正常工作,你需要将程序本身放在一个主块中,这样文件就可以在没有副作用的情况下导入。
这是由于multiprocessing
必须在ms窗口和macOS上工作的方式。阅读多处理的编程指南。
修改你的代码如下:
from multiprocessing import Process, Queue
import time
def calculate(queue):
n = 0
while n < 10:
n += 1
queue.put(n)
time.sleep(1)
queue.put(0)
def queue_getter(queue):
executing = True
while executing:
while queue.qsize():
n = queue.get()
print(n)
if n == 0:
executing = False
time.sleep(0.1)
print("done")
if __name__ == "__main__":
queue = Queue()
p = Process(target=calculate, args=(queue,))
p.start()
queue_getter(queue)
p.join()
print("DONE")
这里有一种简化且更稳健的方法,它在功能上(几乎(与OP的原始方法相同,只是不打印零:
from multiprocessing import Manager
from concurrent.futures import ProcessPoolExecutor
import time
def calculate(q):
for n in range(1, 11):
q.put(n)
time.sleep(1)
q.put(0)
def queue_getter(q):
while (n := q.get()):
print(n)
def main():
with Manager() as manager:
q = manager.Queue()
with ProcessPoolExecutor() as executor:
executor.submit(calculate, q)
queue_getter(q)
if __name__ == '__main__':
main()