ProcessPoolExecutor在大负载下使用map挂起



在map上运行ProcessPoolExecutor时遇到挂起,仅在相对较大的负载下。

我们看到的行为是,大约1分钟的辛勤工作后,工作似乎挂起:CPU利用率急剧下降,然后变为空闲;堆栈跟踪似乎也显示了随着时间的推移调用的相同部分。

def work_wrapper(args):
return work(*args)
def work():
work.....
def start_working(...):
with concurrent.futures.ProcessPoolExecutor(max_workers=num_threads, mp_context=mp.get_context('fork')) as executor:
args = [arg_list1, arg_list2, ...]
for res in executor.map(work_wrapper, args):
pass
if __name__ == "__main__":
mp.set_start_method('fork',force=True)
start_working(...)

堆栈跟踪(我们每5分钟记录一次,但它们看起来非常相似):

Thread 0x00007f4d0ca27700 (most recent call first):
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 373 in _send
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 402 in _send_bytes
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 205 in send_bytes
File "/usr/local/lib/python3.10/multiprocessing/queues.py", line 250 in _feed
File "/usr/local/lib/python3.10/threading.py", line 953 in run
File "/usr/local/lib/python3.10/threading.py", line 1016 in _bootstrap_inner
File "/usr/local/lib/python3.10/threading.py", line 973 in _bootstrap
Thread 0x00007f4d156fc700 (most recent call first):
File "/usr/local/lib/python3.10/threading.py", line 1116 in _wait_for_tstate_lock
File "/usr/local/lib/python3.10/threading.py", line 1096 in join
File "/usr/local/lib/python3.10/multiprocessing/queues.py", line 199 in _finalize_join
File "/usr/local/lib/python3.10/multiprocessing/util.py", line 224 in __call__
File "/usr/local/lib/python3.10/multiprocessing/queues.py", line 151 in join_thread
File "/usr/local/lib/python3.10/concurrent/futures/process.py", line 515 in join_executor_internals
File "/usr/local/lib/python3.10/concurrent/futures/process.py", line 469 in terminate_broken
File "/usr/local/lib/python3.10/concurrent/futures/process.py", line 323 in run
File "/usr/local/lib/python3.10/threading.py", line 1016 in _bootstrap_inner
File "/usr/local/lib/python3.10/threading.py", line 973 in _bootstrap
Thread 0x00007f4d19cce740 (most recent call first):
File "/usr/local/lib/python3.10/threading.py", line 1116 in _wait_for_tstate_lock
File "/usr/local/lib/python3.10/threading.py", line 1096 in join
File "/usr/local/lib/python3.10/concurrent/futures/process.py", line 775 in shutdown
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 649 in __exit__
File "/app/main.py", line 256 in start_working
File "/app/main.py", line 51 in main
File "/app/main.py", line 96 in <module>
File "/app/main.py", line 96 in <module>
File "/app/main.py", line 51 in main
File "/app/main.py", line 256 in start_working
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 649 in __exit__
File "/usr/local/lib/python3.10/concurrent/futures/process.py", line 775 in shutdown
File "/usr/local/lib/python3.10/threading.py", line 1096 in join
File "/usr/local/lib/python3.10/threading.py", line 1116 in _wait_for_tstate_lock
Thread 0x00007f4d19cce740 (most recent call first):
File "/usr/local/lib/python3.10/threading.py", line 973 in _bootstrap
File "/usr/local/lib/python3.10/threading.py", line 1016 in _bootstrap_inner
File "/usr/local/lib/python3.10/concurrent/futures/process.py", line 323 in run
File "/usr/local/lib/python3.10/concurrent/futures/process.py", line 469 in terminate_broken
File "/usr/local/lib/python3.10/concurrent/futures/process.py", line 515 in join_executor_internals
File "/usr/local/lib/python3.10/multiprocessing/queues.py", line 151 in join_thread
File "/usr/local/lib/python3.10/multiprocessing/util.py", line 224 in __call__
File "/usr/local/lib/python3.10/multiprocessing/queues.py", line 199 in _finalize_join
File "/usr/local/lib/python3.10/threading.py", line 1096 in join
File "/usr/local/lib/python3.10/threading.py", line 1116 in _wait_for_tstate_lock
Thread 0x00007f4d156fc700 (most recent call first):
File "/usr/local/lib/python3.10/threading.py", line 973 in _bootstrap
File "/usr/local/lib/python3.10/threading.py", line 1016 in _bootstrap_inner
File "/usr/local/lib/python3.10/threading.py", line 953 in run
File "/usr/local/lib/python3.10/multiprocessing/queues.py", line 250 in _feed
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 205 in send_bytes
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 402 in _send_bytes
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 373 in _send
Thread 0x00007f4d0ca27700 (most recent call first):

Python版本:3.10.8,Docker基础镜像:Python:3.10-slim

我尝试更新python版本,改变多处理上下文(尝试了spawn和fork,两者都给出相同的行为)

您遇到的问题是由于Executor.map没有以相同的方式处理大型/无限可迭代输入。在生成单个值之前,它先消耗整个输入迭代器,并为每个输入提交一个任务。

如果您的输入是惰性生成的(理论上这样可以降低内存使用),则不,它们都是立即读取的。如果它们是无限的(假设您可以break并在收到特定结果时停止),则不,程序将尝试提交无限的任务,并且您将耗尽内存。如果它们非常大,那么,您将为所有提交的任务支付开销(管理开销,pickle它们以将它们传递给工人等)。

如果您可以避免在相当大的块中处理,那么这是一个很容易解决的问题。您还可以复制PR中Executor.map的固定实现,附加到我链接的问题作为顶级函数,手动将执行器传递给它作为self参数(它都是在submit调用方面实现的底层执行器,它不需要作为实例方法);固定版本,默认情况下,提取和提交的任务数量是池中工作线程数量的两倍,然后只有在原始任务完成并且调用者请求它们时才提取和提交额外的任务(因此,如果您正在对结果进行实时循环,而不存储它们,则额外的内存成本与工作线程的数量成比例,通常是小而固定的,而不是输入的总数,这可能是巨大的)。

这是一个改编版本(未经测试,如果我在什么地方打错了,请评论):

import collections
import itertools
import time

def executor_map(executor, fn, *iterables, timeout=None, chunksize=1, prefetch=None):
"""Returns an iterator equivalent to map(fn, iter).
Args:
executor: An Executor to submit the tasks to
fn: A callable that will take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
chunksize: The size of the chunks the iterable will be broken into
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.
prefetch: The number of chunks to queue beyond the number of
workers on the executor. If None, a reasonable default is used.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.monotonic()
if prefetch is None:
prefetch = executor._max_workers
if prefetch < 0:
raise ValueError("prefetch count may not be negative")
argsiter = zip(*iterables)
initialargs = itertools.islice(argsiter, executor._max_workers + prefetch)
fs = collections.deque(executor.submit(fn, *args) for args in initialargs)
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
nonlocal argsiter
try:
while fs:
if timeout is None:
res = fs.popleft().result()
else:
res = fs.popleft().result(end_time - time.monotonic())
# Dispatch next task before yielding to keep
# pipeline full
if argsiter:
try:
args = next(argsiter)
except StopIteration:
argsiter = None
else:
fs.append(executor.submit(fn, *args))
yield res
finally:
for future in fs:
future.cancel()
return result_iterator()

,你可以用它来修改你的代码:

def start_working(...):
with concurrent.futures.ProcessPoolExecutor(max_workers=num_threads, mp_context=mp.get_context('fork')) as executor:
args = [arg_list1, arg_list2, ...]
for res in executor_map(executor, work_wrapper, args):
pass

最新更新