多处理.当给定太多项目时,SimpleQueue挂起



我正在尝试测试multiprocessing.SimpleQueuemultiprocessing.Queue的性能,因此我制作了以下程序:

import multiprocessing as MP
import time

def worker_func(job_queue: MP.Queue, result_queue: MP.Queue):
count = 0
while True:
job = job_queue.get()
result_queue.put(f"Got job {job}")
if job == "DIE":
result_queue.put("FIN")
print("X", end="", flush=True)
break
if count % 100 == 0:
print("+", end="", flush=True)
count += 1
job_queue.close()
result_queue.close()

def test_queue(queue_factory, job_num: int = 100_000):
workers_count = 2
workers = []
jobq = queue_factory()
resq = queue_factory()
for _ in range(0, workers_count):
p = MP.Process(target=worker_func, args=(jobq, resq))
p.start()
workers.append(p)
time.sleep(1.0)
start_t = time.monotonic()
print(f"Sending {job_num:,} jobs ... ", end="", flush=True)
for i in range(0, job_num):
if i % 1000 == 0:
print(".", end="", flush=True)
jobq.put(i)
print("Jobs sent, killing ... ", end="", flush=True)
for _ in workers:
jobq.put("DIE")
print("Signals sent ... ", end="", flush=True)
while not jobq.empty():
time.sleep(0.1)
print("jobq emptied ... ", end="", flush=True)
finished = 0
while not resq.empty() or finished < workers_count:
if resq.get() == "FIN":
finished += 1
jobq.close()
resq.close()
elapsed_t = time.monotonic() - start_t
[w.join() for w in workers]
print(f" Elapsed time: {elapsed_t:,.2f}")
return elapsed_t

if __name__ == '__main__':
joblen = 10_000
print("MP.Queue")
for _ in range(0, 5):
test_queue(MP.Queue, joblen)
print("MP.SimpleQueue")
for _ in range(0, 5):
test_queue(MP.SimpleQueue, joblen)

如果我将joblen设置为100200,程序就会工作。

但如果我把joblen设置为,比如说10_000,它就会卡住。输出如下:

MP.Queue
Sending 10,000 jobs ... .+.........Jobs sent, killing ... Signals sent ... jobq emptied ... ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++XX Elapsed time: 0.61
Sending 10,000 jobs ... .+...+......Jobs sent, killing ... Signals sent ... +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++XXjobq emptied ...  Elapsed time: 0.88
Sending 10,000 jobs ... .++.........Jobs sent, killing ... Signals sent ... jobq emptied ... +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++XX Elapsed time: 0.66
Sending 10,000 jobs ... .+..+.......Jobs sent, killing ... Signals sent ... +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++XXjobq emptied ...  Elapsed time: 0.75
Sending 10,000 jobs ... .+.........Jobs sent, killing ... Signals sent ... jobq emptied ... ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++XX Elapsed time: 0.61
MP.SimpleQueue
Sending 10,000 jobs ... .++++

正如您所看到的,尝试将一个项目.put()放入SimpleQueue中似乎遇到了问题。

SimpleQueue中未完成(未消耗(项目的数量是否存在固有限制?或者我做错了什么?

经过大量搜索,我发现了Bug 41550。

SimpleQueue似乎使用了容量有限(64k iirc(的os.pipe。因此,如果resq没有被定期消耗,它就会阻塞。

我的解决方案是再添加一个工作人员,其功能正是消耗resq

最新更新