我正在运行jupyter 5.0.0笔记本,带有python 3.5.3在Windows 10上。以下示例代码无法运行:
from concurrent.futures import as_completed, ProcessPoolExecutor
import time
import numpy as np
def do_work(idx1, idx2):
time.sleep(0.2)
return np.mean([idx1, idx2])
with ProcessPoolExecutor(max_workers=4) as executor:
futures = set()
for idx in range(32):
future = winprocess.submit(
executor, do_work, idx, idx * 2
)
futures.add(future)
for future in as_completed(futures):
print(future.result())
...并抛出BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
该代码在Ubuntu 14.04。
我知道Windows没有os.fork
,因此多处理的处理方式不同,并且并不总是通过Interactive Mode和Jupyter播放。
在这种情况下,有哪些解决方法可以使ProcessPoolExecutor
起作用?
有一些类似的问题,但它们与multiprocessing.Pool
:
- 多处理。
仔细检查表明,jupyter笔记本可以运行外部Python模块,该模块使用ProcessPoolExecutor
并行化。因此,解决方案是在模块中执行代码的可行部分,并从Jupyter Notebook中调用。
也就是说,这可以推广为实用程序。以下可以存储为模块,例如winprocess.py
,并由Jupyter导入。
import inspect
import types
def execute_source(callback_imports, callback_name, callback_source, args):
for callback_import in callback_imports:
exec(callback_import, globals())
exec('import time' + "n" + callback_source)
callback = locals()[callback_name]
return callback(*args)
def submit(executor, callback, *args):
callback_source = inspect.getsource(callback)
callback_imports = list(imports(callback.__globals__))
callback_name = callback.__name__
future = executor.submit(
execute_source,
callback_imports, callback_name, callback_source, args
)
return future
def imports(callback_globals):
for name, val in list(callback_globals.items()):
if isinstance(val, types.ModuleType) and val.__name__ != 'builtins' and val.__name__ != __name__:
import_line = 'import ' + val.__name__
if val.__name__ != name:
import_line += ' as ' + name
yield import_line
这是您将如何使用此信息:
from concurrent.futures import as_completed, ProcessPoolExecutor
import time
import numpy as np
import winprocess
def do_work(idx1, idx2):
time.sleep(0.2)
return np.mean([idx1, idx2])
with ProcessPoolExecutor(max_workers=4) as executor:
futures = set()
for idx in range(32):
future = winprocess.submit(
executor, do_work, idx, idx * 2
)
futures.add(future)
for future in as_completed(futures):
print(future.result())
请注意,executor
已使用winprocess
更改,原始executor
作为参数传递给submit
函数。
这里发生的事情是笔记本功能代码和导入已序列化并传递给模块进行执行。该代码直到在新过程中安全地执行,因此不会尝试根据Jupyter Notebook本身进行新的过程。
。进口以维持别名的方式处理。如果您确保导入函数本身内执行的功能所需的所有内容,则可以删除导入魔法。
另外,此解决方案仅在将所有必要的变量作为参数传递给函数时才起作用。可以说,该功能应该是静态的,但是我认为这也是ProcessPoolExecutor
的要求。最后,确保您不执行笔记本其他地方定义的其他功能。只会导入外部模块,因此将不包括其他笔记本功能。