我正在执行一个带有ProcessPoolExecutor
的Tornado服务器,以并行处理多个请求。问题是,在一个特定的情况下,当在其中一个进程中引发异常时,它不会传播,而是进程崩溃并出现以下错误:
concurrent.futures.process._RemoteTraceback:
n'''nTraceback (most recent call last):
n File "C:\Users\ActionICT\anaconda3\lib\concurrent\futures\process.py", line 367, in _queue_management_workern result_item = result_reader.recv()
n File "C:\Users\ActionICT\anaconda3\lib\multiprocessing\connection.py", line 251, in recv
n return _ForkingPickler.loads(buf.getbuffer())nTypeError: __init__() missing 1 required positional argument: 'is_local'n'''nnThe above exception was the direct cause of the following exception:
n
nTraceback (most recent call last):n File "C:\S1\Product\Baseline\PYTHON\lab\controller.py", line 558, in getn output = exec_future.result()
n File "C:\Users\ActionICT\anaconda3\lib\concurrent\futures\_base.py", line 428, in resultn return self.__get_result()n File "C:\Users\ActionICT\anaconda3\lib\concurrent\futures\_base.py", line 384, in __get_result
n raise self._exceptionnconcurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.n
我在调试器中尝试过,发现问题出在执行这个上
def _send_bytes(self, buf):
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
try:
if err == _winapi.ERROR_IO_PENDING:
waitres = _winapi.WaitForMultipleObjects(
[ov.event], False, INFINITE)
assert waitres == WAIT_OBJECT_0
except:
ov.cancel()
raise
finally:
nwritten, err = ov.GetOverlappedResult(True)
assert err == 0
assert nwritten == len(buf)
当进程试图将异常传播到相应的Future
对象时,会调用此函数。在第一行中,当调用_winapi.WriteFile
时,调试器中的所有东西都崩溃了,我不明白为什么。知道吗?
我已经解决了一个变通方法:我在try-except中将函数内部封装在单独的进程中,然后将旧异常复制到新异常中并引发它。我不知道为什么。。。但它是有效的。
def _execute_tuning(tune_parameters: TuneParameters):
# function to parallelize todo to be refactored
# execute scenario, then write result or error in output
try:
config.generate_project_config(
project_name=tune_parameters.project_name,
scenario_name=tune_parameters.scenario_name
)
config.generate_session_log_config(project_name=tune_parameters.project_name,
scenario_name=tune_parameters.scenario_name)
tree = DecisionTreeGenerator(tune_parameters.project_name, tune_parameters.scenario_name)
tree.fit(
# todo refactor
auto_tune=True if tune_parameters == 'true' else False,
max_depth=tune_parameters.max_depth,
columns=tune_parameters.columns,
min_samples_leaf=tune_parameters.min_samples_per_leaf,
max_leaf_nodes=tune_parameters.max_leaf_nodes
)
kpi = KPICalculator(tune_parameters.project_name, tune_parameters.scenario_name)
kpi.run(do_optimization_kpi=False)
except Exception as exc:
Loggers.application.exception(exc)
exc_final = Exception(str(exc))
exc_final.__traceback__ = exc.__traceback__
raise exc_final