清空队列时出现问题



问题

我是多处理的新手,无论我做什么,我都一无所获。每当我想办法解决问题时,我就会遇到一个新的障碍。我的目标是使用多个进程加载队列,然后使用多个过程从队列中提取并处理数据。我已经尝试恢复到只进行基本的队列处理,但一旦实现了多个进程,就无法从队列中获得任何内容。我错过了什么?

代码

rom multiprocessing import Process, Lock
from queue import Queue
import os
q = Queue(5)

def get_from_q():
print('trying to get')
print(q.get())

if __name__ == '__main__':
# put items at the end of the queue
for x in range(6):
print('adding ', x)
q.put(x)
PROCESSOR_COUNT = os.cpu_count()
processes = []
for p in range(PROCESSOR_COUNT):
print('spawning process')
p = Process(target=get_from_q)
processes.append(p)
for p in processes:
print('starting')
p.start()
for p in processes:
print('joining')
p.join()

结果:

adding 0
adding 1
adding 2
adding 3
adding 4
adding 5

预期结果

adding 0
adding 1
adding 2
adding 3
adding 4
adding 5
spawning process
spawning process
spawning process
spawning processv
starting
starting
starting
starting
trying to get 
0
trying to get 
1 
trying to get 
2 
trying to get 
3 
trying to get 
4
trying to get 
5
joining
joining
joining
joining

如果您在使用spawn创建新进程的平台下运行,则在创建新进程时,不是继承主进程的地址空间,而是通过从程序顶部重新执行所有代码来初始化新地址空间。这意味着无论您在全局范围内定义了什么,都会被重新执行,例如在您的代码中:

q = Queue(5)

这意味着,该代码由您正在创建的每个进程执行,这意味着每个进程都有自己的q副本。这行不通。您需要创建一次q并将其作为参数传递。我还在print函数中添加了flush=True,以减少各种进程的输出被交错的机会。

from multiprocessing import Process, Lock, Queue
import os

def get_from_q(q):
print('trying to get', q.get(), flush=True)

if __name__ == '__main__':
PROCESSOR_COUNT = os.cpu_count()
q = Queue(PROCESSOR_COUNT) # or put no size limitation on this
# put items at the end of the queue
for x in range(PROCESSOR_COUNT):
print('adding ', x)
q.put(x)
processes = []
for p in range(PROCESSOR_COUNT):
print('spawning process')
p = Process(target=get_from_q, args=(q,))
processes.append(p)
for p in processes:
print('starting', flush=True)
p.start()
for p in processes:
print('joining', flush=True)
p.join()

打印:

adding  0
adding  1
adding  2
adding  3
adding  4
adding  5
adding  6
adding  7
spawning process
spawning process
spawning process
spawning process
spawning process
spawning process
spawning process
spawning process
starting
starting
starting
starting
starting
starting
starting
starting
joining
trying to get 0
trying to get 1
trying to get 2
trying to get 3
trying to get 4
trying to get 5
trying to get 6
joining
joining
joining
trying to get 7
joining
joining
joining
joining

使用进程池

在这里,队列被池实现隐藏:

from multiprocessing import Pool, cpu_count

def worker(x):
print('x =', x, flush=True)
return x ** 2

if __name__ == '__main__':
PROCESSOR_COUNT = cpu_count()
pool = Pool(PROCESSOR_COUNT) #
print(pool.map(worker, range(PROCESSOR_COUNT)))

打印:

x = 0
x = 1
x = 2
x = 3
x = 4
x = 5
x = 6
x = 7
[0, 1, 4, 9, 16, 25, 36, 49]

相关内容

  • 没有找到相关文章

最新更新