为什么国会议员。池快速失败,但进程池执行器没有?



1.

with ProcessPoolExecutor() as executor:
futures = [executor.submit(foo, user_number, user_id)
for user_number, user_id in enumerate(user_ids, start=1)]
for future in as_completed(futures):
future.result()

阿拉伯数字。

pool = Pool()
results = [pool.apply_async(foo, args=(user_number, user_id))
for user_number, user_id in enumerate(user_ids, start=1)]
for result in results:
result.get()
pool.close()
pool.join()

3.

pool = Pool()
results = [pool.apply_async(foo, args=(user_number, user_id))
for user_number, user_id in enumerate(user_ids, start=1)]
try:
for result in results:
result.get()
finally:
pool.close()
pool.join()

foo执行某些工作,然后引发值错误。

使用第一个示例时,我仅在所有期货完成后才得到异常。
在第二个示例中,一旦第一个作业失败,我就会出现异常。
对于第三个示例,它的作用与第一个示例类似

如何快速失败并在退出之前清理资源?

为什么会这样? 根据文档,as_completed在期货完成后立即返回期货,调用future.result()应该会引发异常。

Python 版本是 3.6.9

问题是 Python 无法安全地取消已经开始的作业。区别只在于你告诉Python做什么:

  • 情况 1:异常future.result()引发。然后,控制流从with语句中断,并触发ProcessPoolExecutor.__exit__。默认情况下,这会等待所有挂起的作业完成,因此执行挂起,直到这种情况完成。

  • 情况 2:Python 解释器在遇到异常时立即退出。但这并不意味着您的作业已停止运行!你永远不会等他们完成。

  • 情况 3:引发异常后,调用pool.join(),这与情况 1 中的情况大致相同。执行等待作业完成,然后退出。

您可以使用此脚本检查案例 2 中到底发生了什么:

import signal
from multiprocessing import Pool
import time

def throw():
raise ValueError()

def foo():
def sigterm_handler(*args):
print('received sigterm')
raise SystemExit()
signal.signal(signal.SIGTERM, sigterm_handler)
while True:
print('still alive')
time.sleep(0.1)
pool = Pool()
results = [pool.apply_async(throw), pool.apply_async(foo)]
time.sleep(1)
for result in results:
result.get()
pool.close()
pool.join()

在 OSX 上,这将输出:

$ python mp_test.py
still alive
still alive
still alive
still alive
still alive
still alive
still alive
still alive
still alive
still alive
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "mp_test.py", line 8, in throw
raise ValueError()
ValueError
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "mp_test.py", line 27, in <module>
result.get()
File "/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 657, in get
raise self._value
ValueError
still alive
received sigterm

因此,当解释器退出时,工作人员会收到SIGTERM信号(不过,行为可能取决于操作系统(。请注意,SIGTERM可以忽略(例如,由您在工作线程中使用的第三方库忽略(,因此无法保证在这种情况下您的工作线程实际退出。


现在,如果您有长时间运行的作业,并且确定可以安全地取消它们(例如,因为它们不执行任何 I/O(,则可以使用类似以下内容来模拟案例 2 的行为:

with concurrent.futures.ProcessPoolExecutor() as executor:
try:
futures = [executor.submit(foo, user_number, user_id)
for user_number, user_id in enumerate(user_ids, start=1)]
for future in concurrent.futures.as_completed(futures):
future.result()
except Exception:
# abort workers immediately if anything goes wrong
for process in executor._processes.values():
process.terminate()
raise

这会将SIGTERM发送到遇到异常时仍在运行的所有作业,然后引发异常(并等待所有进程完成,以便您可以确定它们已停止(。同样,这不是一个优雅的退出 - 当您在 I/O 期间中断时,它可能导致数据丢失或资源悬空。

Python 文档对terminate方法是这样说的:

终止进程。在Unix上,这是使用SIGTERM信号完成的;在 Windows 上使用 TerminateProcess((。请注意,不会执行退出处理程序和 finally 子句等。

请注意,进程的后代进程不会被终止 - 它们只会成为孤立进程。

警告

如果在关联的进程使用管道或队列时使用此方法,则管道或队列可能会损坏,并且可能被其他进程无法使用。同样,如果进程已获取锁或信号量等,则终止它可能会导致其他进程死锁。

相关内容

  • 没有找到相关文章

最新更新