我正在尝试创建一个python装饰器,它使用args和kwargs获取函数,在新进程中执行它,关闭它并返回函数返回的任何内容,包括引发相同的异常(如果有的话(。
目前,我的装饰器可以正常处理函数,如果它们没有引发异常,但无法提供回溯。如何将其传递回父进程?
from functools import wraps
from multiprocessing import Process, Queue
import sys
def process_wrapper(func):
@wraps(func)
def wrapper(*args, **kwargs):
# queue for communicating between parent and child processes
q = Queue()
def func_to_q(_q: Queue, *_args, **_kwargs):
# do the same as func, but put result into the queue. Also put
# there an exception if any.
try:
_res = func(*_args, **_kwargs)
_q.put(_res)
except:
_q.put(sys.exc_info())
# start another process and wait for it to join
p = Process(target=func_to_q, args=(q, )+args, kwargs=kwargs)
p.start()
p.join()
# get result from the queue and return it, or raise if it's an exception
res = q.get(False)
if isinstance(res, tuple) and isinstance(res[0], Exception):
raise res[1].with_traceback(res[2])
else:
return res
return wrapper
if __name__ == '__main__':
@process_wrapper
def ok():
return 'ok'
@process_wrapper
def trouble():
def inside():
raise UserWarning
inside()
print(ok())
print(trouble())
我希望结果是这样的:
ok
Traceback (most recent call last):
File "/temp.py", line 47, in <module>
print(trouble())
File "/temp.py", line 44, in trouble
inside()
File "/temp.py", line 43, in inside
raise UserWarning
UserWarning
Process finished with exit code 1
但是似乎子进程无法将堆栈跟踪放入队列中,我得到以下结果:
ok
Traceback (most recent call last):
File "/temp.py", line 47, in <module>
print(trouble())
File "/temp.py", line 26, in wrapper
res = q.get(False)
File "/usr/lib/python3.6/multiprocessing/queues.py", line 107, in get
raise Empty
queue.Empty
Process finished with exit code 1
此外,如果子项仅将异常本身放入队列_q.put(sys.exc_info()[1])
,则父级从那里获取它并引发但具有新的堆栈跟踪(注意缺少对inside()
的调用(:
ok
Traceback (most recent call last):
File "/temp.py", line 47, in <module>
print(trouble())
File "/temp.py", line 28, in wrapper
raise res
UserWarning
Process finished with exit code 1
看看multiprocessing.pool.py
和字符串化黑客,用于向父级发送异常。您可以从那里使用multiprocessing.pool.ExceptionWithTraceback
。
这足以演示基本原理
:from multiprocessing import Process, Queue
from multiprocessing.pool import ExceptionWithTraceback
def worker(outqueue):
try:
result = (True, 1 / 0) # will raise ZeroDivisionError
except Exception as e:
e = ExceptionWithTraceback(e, e.__traceback__)
result = (False, e)
outqueue.put(result)
if __name__ == '__main__':
q = Queue()
p = Process(target=worker, args=(q,))
p.start()
success, value = q.get()
p.join()
if success:
print(value)
else:
raise value # raise again
输出:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/...", line 7, in worker
result = (True, 1 / 0) # will raise ZeroDivisionError
ZeroDivisionError: division by zero
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/...", line 23, in <module>
raise value
ZeroDivisionError: division by zero
Process finished with exit code 1