在 pool.join() 异步处理队列期间挂起



在python文档中,它说如果maxsize小于或等于零,则队列大小是无限的。我也试过maxsize=-1.但是,情况并非如此,程序将挂起。因此,作为解决方法,我创建了多个Queues来使用。但这并不理想,因为我需要使用更大的列表,然后必须创建越来越多的Queue()并添加额外的代码来处理元素。

queue = Queue(maxsize=0)
queue2 = Queue(maxsize=0)
queue3 = Queue(maxsize=0)
PROCESS_COUNT = 6
def filter(aBigList):
list_chunks = list(chunks(aBigList, PROCESS_COUNT))
pool = multiprocessing.Pool(processes=PROCESS_COUNT)
for chunk in list_chunks:
pool.apply_async(func1, (chunk,))
pool.close()
pool.join()
allFiltered = []
# list of dicts
while not queue.empty():
allFiltered.append(queue.get())
while not queue2.empty():
allFiltered.append(queue2.get())
while not queue3.empty():
allFiltered.append(queue3.get())
//do work with allFiltered
def func1(subList):
SUBLIST_SPLIT = 3
theChunks = list(chunks(subList, SUBLIST_SPLIT))
for i in theChunks[0]:
dictQ = updateDict(i)
queue.put(dictQ)
for x in theChunks[1]:
dictQ = updateDict(x)
queue2.put(dictQ)
for y in theChunks[2]:
dictQ = updateDict(y)
queue3.put(dictQ)

发生此问题是因为您未在加入调用之前处理Queue。 当您使用multiprocessing.Queue时,您应该在尝试加入馈送器进程之前清空它。Process等待放入Queue中的所有对象被刷新,然后再终止。我不知道为什么即使是大尺寸的Queue也会这样,但它可能与底层os.pipe对象的大小不够大这一事实有关。 因此,将您的get电话放在pool.join之前应该可以解决您的问题。

PROCESS_COUNT = 6
def filter(aBigList):
list_chunks = list(chunks(aBigList, PROCESS_COUNT))
pool = multiprocessing.Pool(processes=PROCESS_COUNT)
result_queue = multiprocessing.Queue()
async_result = []
for chunk in list_chunks:
async_result.append(pool.apply_async(
func1, (chunk, result_queue)))
done = 0
while done < 3:
res = queue.get()
if res == None:
done += 1
else:
all_filtered.append(res)
pool.close()
pool.join()
# do work with allFiltered
def func1(sub_list, result_queue):
# mapping function
results = []
for i in sub_list:
result_queue.append(updateDict(i))
result_queue.append(None)

一个问题是你为什么需要自己处理沟通? 如果你是因素,你可以让Pool为你管理:

PROCESS_COUNT = 6
def filter(aBigList):
list_chunks = list(chunks(aBigList, PROCESS_COUNT))
pool = multiprocessing.Pool(processes=PROCESS_COUNT)
async_result = []
for chunk in list_chunks:
async_result.append(pool.apply_async(func1, (chunk,)))
pool.close()
pool.join()
# Reduce the result
allFiltered = [res.get() for res in async_result]
# do work with allFiltered
def func1(sub_list):
# mapping function
results = []
for i in sub_list:
results.append(updateDict(i))
return results

这样可以避免此类错误。

编辑最后,您甚至可以通过使用Pool.map函数进一步减少代码,该函数甚至可以处理块大小。 如果您的块变得太大,您可能会在结果的酸洗过程中出错(如您的评论中所述(。因此,您可以使用map来减小适应裂缝的大小:

PROCESS_COUNT = 6
def filter(aBigList):
# Run in parallel a internal function of mp.Pool which run
# UpdateDict on chunk of 100 item in aBigList and return them.
# The map function takes care of the chunking, dispatching and
# collect the items in the right order.
with multiprocessing.Pool(processes=PROCESS_COUNT) as pool:
allFiltered = pool.map(updateDict, aBigList, chunksize=100)
# do work with allFiltered

相关内容

  • 没有找到相关文章

最新更新