在使用python多处理池时,提交了多少作业?
它是如何决定的?我们能以某种方式控制它吗?最多像队列中的10个作业,以减少内存使用。
假设我有如下主干代码:对于每个chrom和模拟,我将数据读取为pandas数据帧。
(我认为在提交作业之前读取数据会更好,以减少工人过程中的I/O限制(
然后我将pandas数据帧发送给每个工人进行处理
但是,提交的作业似乎比最终确定的作业数量多,这导致了内存错误。
numofProcesses = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=numofProcesses)
jobs=[]
all_result1={}
all_result2={}
def accumulate(result):
result1=result[0]
result2=result[1]
accumulate(resulst1,all_result1)
accumulate(resulst2,all_result2)
print('ACCUMULATE')
for each chr:
for each sim:
chrBased_simBased_df= readData(chr,sim)
jobs.append(pool.apply_async(func, args=(chrBased_simBased_df,too,many,),callback=accumulate))
print('Submitted job:%d' %(len(jobs)))
pool.close()
pool.join()
有办法摆脱它吗?
multiprocessing.Pool
和concurrent.futures.ProcessPoolExecutor
都不允许限制提交给工人的任务量。
尽管如此,这是一个非常琐碎的扩展,您可以使用Semaphore自行构建。
你可以在这个要点中查一个例子。它使用concurrent.futures
模块,但将其移植到multiprocessing.Pool
也应该很简单。
from threading import BoundedSemaphore
from concurrent.futures import ProcessPoolExecutor
class MaxQueuePool:
"""This Class wraps a concurrent.futures.Executor
limiting the size of its task queue.
If `max_queue_size` tasks are submitted, the next call to submit will block
until a previously submitted one is completed.
"""
def __init__(self, executor, max_queue_size, max_workers=None):
self.pool = executor(max_workers=max_workers)
self.pool_queue = BoundedSemaphore(max_queue_size)
def submit(self, function, *args, **kwargs):
"""Submits a new task to the pool, blocks if Pool queue is full."""
self.pool_queue.acquire()
future = self.pool.submit(function, *args, **kwargs)
future.add_done_callback(self.pool_queue_callback)
return future
def pool_queue_callback(self, _):
"""Called once task is done, releases one queue slot."""
self.pool_queue.release()
if __name__ == '__main__':
pool = MaxQueuePool(ProcessPoolExecutor, 8)
f = pool.submit(print, "Hello World!")
f.result()