在长时间运行的多线程脚本中遇到一些可能的内存泄漏后,我发现了maxtasksperchild
,它可以在多进程池中使用,如下所示:
import multiprocessing
with multiprocessing.Pool(processes=32, maxtasksperchild=x) as pool:
pool.imap(function,stuff)
线程池(multiprocessing.pool.ThreadPool
)是否可能出现类似的事情?
正如noxdafox的回答所说,在父类中没有办法,您可以使用threading
模块来控制每个孩子的最大任务数。由于你想使用multiprocessing.pool.ThreadPool
,threading
模块是相似的,所以...
def split_processing(yourlist, num_splits=4):
'''
yourlist = list which you want to pass to function for threading.
num_splits = control total units passed.
'''
split_size = len(yourlist) // num_splits
threads = []
for i in range(num_splits):
start = i * split_size
end = len(yourlist) if i+1 == num_splits else (i+1) * split_size
threads.append(threading.Thread(target=function, args=(yourlist, start, end)))
threads[-1].start()
# wait for all threads to finish
for t in threads:
t.join()
比方说 您的列表有 100 个项目,然后
if num_splits = 10; then threads = 10, each thread has 10 tasks.
if num_splits = 5; then threads = 5, each thread has 20 tasks.
if num_splits = 50; then threads = 50, each thread has 2 tasks.
and vice versa.
查看multiprocessing.pool.ThreadPool
实现,很明显maxtaskperchild
参数没有传播到父类multiprocessing.Pool
。multiprocessing.pool.ThreadPool
实现从未完成,因此它缺少很少的功能(以及测试和文档)。
Pebble 包实现了一个ThreadPool
,支持在处理了给定数量的任务后重新启动工作线程。
我想要一个线程池,一旦池中的另一个任务完成,它就会运行一个新任务(即maxtasksperchild=1
)。我决定编写一个小的"ThreadPool"类,为每个任务创建一个新线程。池中的任务完成后,将为传递给map
方法的可迭代对象中的下一个值创建另一个线程。map
方法将一直阻塞,直到传递的可迭代对象中的所有值都已处理并返回其线程。
import threading
class ThreadPool():
def __init__(self, processes=20):
self.processes = processes
self.threads = [Thread() for _ in range(0, processes)]
def get_dead_threads(self):
dead = []
for thread in self.threads:
if not thread.is_alive():
dead.append(thread)
return dead
def is_thread_running(self):
return len(self.get_dead_threads()) < self.processes
def map(self, func, values):
attempted_count = 0
values_iter = iter(values)
# loop until all values have been attempted to be processed and
# all threads are finished running
while (attempted_count < len(values) or self.is_thread_running()):
for thread in self.get_dead_threads():
try:
# run thread with the next value
value = next(values_iter)
attempted_count += 1
thread.run(func, value)
except StopIteration:
break
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
pass
class Thread():
def __init__(self):
self.thread = None
def run(self, target, *args, **kwargs):
self.thread = threading.Thread(target=target,
args=args,
kwargs=kwargs)
self.thread.start()
def is_alive(self):
if self.thread:
return self.thread.is_alive()
else:
return False
你可以像这样使用它:
def run_job(self, value, mp_queue=None):
# do something with value
value += 1
with ThreadPool(processes=2) as pool:
pool.map(run_job, [1, 2, 3, 4, 5])