在 IPython 中运行 ProcessPoolExecutor



我在MacBook上的IPython解释器(IPython 7.9.0,Python 3.8.0(中运行一个简单的多处理示例,遇到了一个奇怪的错误。 这是我输入的内容:

[In [1]: from concurrent.futures import ProcessPoolExecutor
[In [2]: executor=ProcessPoolExecutor(max_workers=1)
[In [3]: def func():
print('Hello')
[In [4]: future=executor.submit(func)

但是,我收到以下错误:

Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)                                   
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 233, in _process_worker
call_item = call_queue.get(block=True)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/queues.py", line 116, in get
return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'func' on <module '__main__' (built-in)>

此外,尝试再次提交作业给了我一个不同的错误:

[In [5]: future=executor.submit(func)                                            
---------------------------------------------------------------------------
BrokenProcessPool                         Traceback (most recent call last)
<ipython-input-5-42bad1a6fe80> in <module>
----> 1 future=executor.submit(func)
/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py in submit(*args, **kwargs)
627         with self._shutdown_lock:
628             if self._broken:
--> 629                 raise BrokenProcessPool(self._broken)
630             if self._shutdown_thread:
631                 raise RuntimeError('cannot schedule new futures after shutdown')
BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore

作为健全性检查,我在 Python 文件中键入相同(几乎(的代码并从命令行 (python3 test.py( 运行它。 它工作正常。

为什么IPython对我的测试有问题?

编辑:

这是工作正常的Python文件。

from concurrent.futures import ProcessPoolExecutor as Executor
def func():
print('Hello')
if __name__ == '__main__':
with Executor(1) as executor:
future=executor.submit(func)
print(future.result())

TLDR;

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
# create child processes using 'fork' context
executor = ProcessPoolExecutor(max_workers=1, mp_context=mp.get_context('fork'))

这实际上是由MacOS上的python 3.8切换到"spawn"方法来创建子进程引起的;而不是3.8之前的默认值"fork"。以下是一些本质区别:

叉:

  • 克隆父进程的数据和代码,因此继承父程序的状态。
  • 子进程对继承的变量所做的任何修改都不会反映父进程中这些变量的状态。状态基本上是从这一点分的(写入时复制(。
  • 父流程中导入的所有库都可用于子进程的上下文。这也使此方法快速,因为子进程不必重新导入库(代码(和变量(数据(。
  • 这有一些缺点,尤其是在分叉多线程程序方面。
  • 一些带有C后端的库,如Tensorflow,OpenCV等,不是fork安全的,并导致子进程以非确定性的方式挂起。

产卵:

  • 为子进程创建新的解释器,而不继承代码或数据。
  • 仅将必要的数据/参数发送到子进程。这意味着变量、线程锁、文件描述符等不会自动用于子进程——这避免了难以捕获的错误。
  • 这种方法也有一些缺点——因为数据/参数需要发送到子进程,所以它们也必须是可腌制的。一些具有内部锁/互斥锁的对象(如队列(是不可酸洗的,并且酸洗较重的对象(如数据帧和大型 numpy 数组(是昂贵的。
  • 取消子进程上的对象将导致重新导入关联的库(如果有(。这又增加了时间。
  • 由于父代码未克隆到子进程中,因此在创建子进程时需要使用if __name__ == '__main__'防护。不这样做将使子进程无法从父进程导入代码(现在以进程运行(。这也是您的程序在与防护装置一起使用时工作的原因。

如果你注意到 fork 会带来一些由你的程序或导入的非 fork 安全库引起的不可预测的影响,你可以:

  • (a( 全局设置多处理的上下文以使用"fork"方法:
import multiprocessing as mp
mp.set_start_method("fork")

请注意,这将全局设置上下文,一旦设置,您或任何其他导入的库将无法更改此上下文。

  • (b( 使用多处理的get_context方法在本地设置上下文:
import multiprocessing as mp
mp_fork = mp.get_context('fork')
# mp_fork has all the attributes of mp so you can do:
mp_fork.Process(...)  
mp_fork.Pool(...)
# using local context will not change global behaviour:
# create child process using global context
# default is fork in < 3.8; spawn otherwise
mp.Process(...)
# most multiprocessing based functionality like ProcessPoolExecutor 
# also take context as an argument:
executor=ProcessPoolExecutor(max_workers=1, mp_context=mp_fork)

好的,终于知道发生了什么。 问题是Mac OS - 它默认使用"spawn"方法来创建子进程。这里 https://docs.python.org/3/library/multiprocessing.html 解释了这一点,以及将其更改为fork的方法(尽管它指出fork在Mac os上不安全(。

使用spawn方法,启动一个新的Python解释器,并将你的代码提供给它。然后,这会尝试在 main 下找到您的函数,但在这种情况下没有 main,因为没有程序,只有解释的命令。

如果将启动方法更改为 fork,则代码将运行(但请注意,这是不安全的警告(

In [1]: import multiprocessing as mp                                                                                     
In [2]: mp.set_start_method("fork")                                                                                      
In [3]: def func(): 
...:     print("foo"); 
...:                                               
In [4]: from concurrent.futures import ProcessPoolExecutor                                                               
In [5]: executor=ProcessPoolExecutor(max_workers=1)                                                               
In [6]: future=executor.submit(func)                                                                                     
foo
In [7]:  

我不确定答案是否因为警告而有帮助,但它解释了为什么当你有一个程序(你的其他尝试(时它的行为不同,以及为什么它在 Ubuntu 上运行良好 - 它默认使用"fork"。

相关内容

  • 没有找到相关文章

最新更新