多处理池和生成器



首先看下面的代码:

pool = multiprocessing.Pool(processes=N)
batch = []
for item in generator():
batch.append(item)
if len(batch) == 10:
pool.apply_async(my_fun, args=(batch,))
batch = []
# leftovers
pool.apply_async(my_fun, args=(batch,))

本质上,我是从生成器中检索数据,收集到一个列表中,然后生成一个消耗该批数据的进程。

这可能看起来不错,但是当使用者(又名池进程(比生产者(又名生成器(慢时,主进程的内存使用量会增长,直到生成器停止或......系统内存不足。

如何避免此问题?

在这种情况下,您可能希望使用大小有限的队列。

q = multiprocessing.Queue(maxSize).

当与最大大小一起使用时,这将为您提供必要的计数,并在调用 q.put(( 的线程已满时阻止该线程,因此您永远不会在其上发布超过一定数量的工作项,从而限制存储待处理项所需的内存。

或者,您可以使用计数信号量(例如,多处理。BoundedSemaphore(maxSize((.每次从生成器获取工作项时获取它,并在处理该项后在工作函数 (my_fun( 中释放它。这样,等待处理的最大工作项数永远不会超过信号量的初始值。

使用grouper迭代工具配方从生成器中对数据进行分块

使用并发期货中的基础结构来处理进程的任务提交和检索。

你可以

提交
  • 一组任务;等待它们完成;然后提交另一个组,或者
  • 通过在每次完成时提交一个新任务来保持管道已满。

设置(尝试模拟您的过程(:

import concurrent.futures
import itertools, time, collections, random
from pprint import pprint
# from itertools recipes
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)
# generator/iterator facsimile
class G:
'''Long-winded range(n)'''
def __init__(self, n=108):
self.n = n
self.a = []
def __iter__(self):
return self
def __next__(self):
#self.a.append(time.perf_counter())
if self.n < 0:
raise StopIteration
x = self.n
self.n -= 1
return x
def my_func(*args):
time.sleep(random.randint(1,10))
return sum(*args)

等待任务组完成

if __name__ == '__main__':
nworkers = 4
g = G()
# generate data three-at-a-time
data = grouper(g, 3, 0)
results = []
fs = []
with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
for args in data:
print(f'pending:{len(executor._pending_work_items)}')
# block submission - limit pending tasks to conserve resources (memory) 
if len(executor._pending_work_items) == nworkers:
# wait till all complete and get the results
futures = concurrent.futures.wait(fs, return_when=concurrent.futures.ALL_COMPLETED)
#print(futures)
results.extend(future.result() for future in futures.done)
fs = list(futures.not_done)
# add a new task
fs.append(executor.submit(my_func, args))
# data exhausted - get leftover results as they finish
for future in concurrent.futures.as_completed(fs):
print(f'pending:{len(executor._pending_work_items)}')
result = future.result()
results.append(result)
pprint(results)

保持进程池已满

if __name__ == '__main__':
nworkers = 4
g = G()
# generate data three-at-a-time
data = grouper(g, 3, 0)
results = []
fs = []
with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
for args in data:
print(f'pending:{len(executor._pending_work_items)}')
# block submission - limit pending tasks to conserve resources (memory) 
if len(executor._pending_work_items) == nworkers:
# wait till one completes and get the result
futures = concurrent.futures.wait(fs, return_when=concurrent.futures.FIRST_COMPLETED)
#print(futures)
results.extend(future.result() for future in futures.done)
fs = list(futures.not_done)
# add a new task
fs.append(executor.submit(my_func, args))
# data exhausted - get leftover results as they finish
for future in concurrent.futures.as_completed(fs):
print(f'pending:{len(executor._pending_work_items)}')
result = future.result()
results.append(result)
pprint(results)

相关内容

  • 没有找到相关文章