Python ThreadPoolExecutor (concurrent.futures)内存泄漏



你好,我正试图加载一个大列表==list.txt,并将其发送到concurrent.futures.ThreadPoolExecutor的函数==Do_something()问题是,无论我做什么,内存变得沉重,起初我认为原因是我打开list.txt到一个变量作为(列表),因为我从list = open("list.txt").readlines()更改了代码到for i in open("list.txt").readlines(),但仍然存在问题,是可能的清除内存逐行完成工作后?

我代码:

import time
from concurrent.futures import ThreadPoolExecutor

def Do_something(i):

time.sleep(5) #Do Something ~ take few sec 

pass

if __name__ == "__main__":
#list = open("list.txt").readlines()
#even with 1 thread code have problem
with ThreadPoolExecutor(1) as executor:
try:
#list.txt == 10,000,000 Line
[executor.submit(Do_something , i )for i in open("list.txt").readlines()]

except Exception as exx:
pass

首先,完全删除.readlines()调用;file对象已经是其行的可迭代对象,所以您所做的就是强迫它创建一个包含所有行的list,然后创建一个包含使用这些行的所有任务的list。作为一个规则,.readlines()从来没有是必要的(这是一个微优化的list(fileobj),当你不需要list,你不想使用它)。

其次,在从任何任务获得结果之前,您显式地尝试为的所有输入行创建任务。虽然避免使用.readlines()可以节省list包装所有这些行的开销,但您仍然试图将它们全部保存在内存中,每个任务一个。如果你没有足够的内存来同时保存所有的任务,你就不能这样做。

如果你想让一定数量的任务排队,在它们完成时处理结果,并让新任务排队,你可以这样做(改编自一个补丁,使Executor.map避免你遇到的问题):

import collections
import itertools
import time

def executor_map(executor, fn, *iterables, timeout=None, chunksize=1, prefetch=None):
"""Returns an iterator equivalent to map(fn, iter).
Args:
executor: An Executor to submit the tasks to
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: The size of the chunks the iterable will be broken into
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.
prefetch: The number of chunks to queue beyond the number of
workers on the executor. If None, a reasonable default is used.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.monotonic()
if prefetch is None:
prefetch = executor._max_workers
if prefetch < 0:
raise ValueError("prefetch count may not be negative")
argsiter = zip(*iterables)
initialargs = itertools.islice(argsiter, executor._max_workers + prefetch)
fs = collections.deque(executor.submit(fn, *args) for args in initialargs)
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
nonlocal argsiter
try:
while fs:
if timeout is None:
res = fs.popleft().result()
else:
res = fs.popleft().result(end_time - time.monotonic())
# Dispatch next task before yielding to keep
# pipeline full
if argsiter:
try:
args = next(argsiter)
except StopIteration:
argsiter = None
else:
fs.append(executor.submit(fn, *args))
yield res
finally:
for future in fs:
future.cancel()
return result_iterator()

一旦你有了map实用程序,你可以改变你的代码为:

if __name__ == "__main__":
with ThreadPoolExecutor() as executor:
try:
#list.txt == 10,000,000 Line
with open("list.txt") as f:  # Use with statements to get deterministic file close
for res in executor_map(executor, Do_something, f):
pass  # If Do_something returns useful values, you can use them here
# with each result going into res
except Exception as exx:
pass

,它一次只存在有限数量的任务(多于工作人员的数量,但有些可能已经有您尚未提取的结果),文件被惰性读取,因此它不会耗尽您的RAM。

最新更新