有人能告诉我为什么我们仍然不能在multiprocessing.Process
标准库中使用本地函数吗
我们在2022年,这个问题需要复制粘贴代码来解决…
def run_ping_outside():
ping_on_port('google.com', 1234, timeout=5)
class PingHelperUnitTest(unittest.TestCase):
def test_ping_on_port_timeout(self):
def run_ping_inside():
self.assertTrue(ping_on_port('google.com', 1234, timeout=5))
# Don't work
# process = Process(target=run_ping_inside)
process = Process(target=run_ping_outside)
process.start()
这是一个简单的例子,但真正的好处是为单元测试创建一个@timeout
装饰器
并且能够测试我们应用程序的某些部分的速度。
def timeout(max_timeout_allowed):
def _timeout(decorated_function):
def timeout_wrapper(self=None):
def timeout_inner(*args, **kwargs):
arguments = (args, kwargs) if self is None else (self, args, kwargs)
process = Process(target=decorated_function, args=arguments)
process.start()
sleep(max_timeout_allowed)
done_in_time = not process.is_alive()
if not done_in_time:
process.terminate()
if self is not None:
self.assertTrue(done_in_time, f"Function ran out out time: {max_timeout_allowed} second(s)")
return timeout_inner()
return timeout_wrapper
return _timeout
这里是这个装饰器的一个非工作示例,因为我们不能pickle一个装饰函数。。。
PS:不,我不能使用Thread
,因为我们无法终止它们。。。
所以,这有点像破解,但您可以通过将未修饰函数的副本保存在新名称下并更改__qualname__
来完成,这样pickle就知道在哪里可以找到保存的版本。然后,在子流程中,将运行相同的装饰器,将未装饰的函数注册到相同的位置。
这是代码:
from multiprocessing import Process
import time
import pickle
import sys
class TimeoutClosure(object):
def __init__(self, func, max_timeout_allowed):
self.func = func
self.max_timeout_allowed = max_timeout_allowed
self.register_inner_function(func)
def register_inner_function(self, func):
prefix = 'old_'
func_name = func.__qualname__
saved_name = prefix + func_name
module_name = pickle.whichmodule(func, func_name)
module = sys.modules[module_name]
setattr(module, saved_name, func)
self.func.__qualname__ = saved_name
def __call__(self, *args, **kwargs):
process = Process(target=self.func, args=args, kwargs=kwargs)
process.start()
# Note: this is faster than time.sleep(), as it exits early if
# the function finishes early.
process.join(self.max_timeout_allowed)
done_in_time = not process.is_alive()
if not done_in_time:
process.terminate()
if self is not None:
assert done_in_time, f"Function ran out out time: {self.max_timeout_allowed} second(s)"
def timeout(max_timeout_allowed):
def inner(func):
return TimeoutClosure(func, max_timeout_allowed)
return inner
@timeout(1)
def foo():
print("Entering foo")
time.sleep(2)
print("Leaving foo")
if __name__ == '__main__':
foo()