我正在尝试使用导入的模块实例化对象。为了使这些导入过程安全(因为我在Windows上(,我正在使用if __name__ == '__main__':
块内的import
语句。
我的文件看起来像这样:
main.py
# main.py
from multiprocessing import Process
# target func for new process
def init_child(foo_obj, bar_obj):
pass
if __name__ == "__main__":
# protect imports from child process
from foo import getFoo
from bar import getBar
# get new objects
foo_obj = getFoo()
bar_obj = getBar()
# start new process
child_p = Process(target=init_child, args=(foo_obj, bar_obj))
child_p.start()
# Wait for process to join
child_p.join()
foo.py
# foo.py
import os
print 'Foo Loaded by PID: ' + str(os.getpid())
class Foo:
def __init__(self):
pass
def getFoo():
# returning new instance of class
return Foo()
bar.py
# bar.py
import os
print 'Bar Loaded by PID: ' + str(os.getpid())
class Bar:
def __init__(self):
pass
def getBar():
# not returning a new instance
return 'bar'
输出
Foo Loaded by PID: 58760
Bar Loaded by PID: 58760
Foo Loaded by PID: 29376
我获得的输出表示foo
模块已加载两次。我了解解释器再次执行主模块(因为Windows不支持fork
系统调用(,但是奇怪的是它是在__main__
块内导入的。
共享对象时可能是一个问题;就像从专用模块导入的队列一样。有什么想法会导致这件事?
谢谢!
这是因为在Windows多处理上使用方法spawn
来启动新进程。正如您提到的,与fork
不同,此方法复制了过程所需的所有内容以在过程本身中启动工作。因此,如果将任何对象作为参数启动以启动一个过程,则使用该对象持有的所有必需数据都使用泡菜序列化,并将其发送到该过程,其中使用该数据重新创建对象/重复对象。
这如何影响您的您的代码?
您使用Process
类将Foo
类的实例传递给init_child
。此后发生的事情是,实例内的数据以及类的全名和路径被腌制并发送到新过程。请记住,腌制的数据不包括类的任何代码或类属性,只是它的名称。然后使用类和实例数据的名称,创建一个新对象,准备使用。
您可能已经注意到,只有在新过程中已经定义了Foo
类的情况下,整个过程才能起作用,为此,定义Foo
的模块也必须被导入其。多处理会自动为您处理所有这些,这是您案例中歧义的原因。
要使事情变得更清晰,请在下面的代码下运行此操作,该代码也打印了工作过程的PID,并创建了3个工人:
from multiprocessing import Process
import os
# target func for new process
def init_child(foo_obj, bar_obj):
print(f"Worker Process loaded in PID: {os.getpid()}")
pass
if __name__ == "__main__":
# protect imports from child process
from foo import getFoo
from bar import getBar
# get new objects
foo_obj = getFoo()
bar_obj = getBar()
for _ in range(3):
print()
# start new process
child_p = Process(target=init_child, args=(foo_obj, bar_obj))
child_p.start()
# Wait for process to join
child_p.join()
输出
Foo Loaded by PID: 41320
Bar Loaded by PID: 41320
Foo Loaded by PID: 23864
Worker Process loaded in PID: 23864
Foo Loaded by PID: 90256
Worker Process loaded in PID: 90256
Foo Loaded by PID: 123352
Worker Process loaded in PID: 123352
,如您所见,foo.py
正在与工人相同的过程中导入以重新创建实例。
那么,什么是解决方案?
您可以使用经理。这些产生了一个服务器进程,其中创建和存储了托管对象,从而允许其他进程访问其共享状态而无需创建重复。在您的情况下,foo.py
仍将导入一个额外的时间来在服务器进程中创建实例。但是,在完成此操作之后,您可以自由地将实例传递给其他过程,而不必担心在该过程中导入的模块foo.py
。
main.py
from multiprocessing import Process
from multiprocessing.managers import BaseManager
import os
# target func for new process
def init_child(foo_obj, bar_obj):
print(f"Worker Process loaded in PID: {os.getpid()}")
pass
if __name__ == "__main__":
# protect imports from child process
import ultra_random_test
from foo import get_foo_with_manager, Foo
from bar import getBar
BaseManager.register('Foo', Foo)
manager = BaseManager()
manager.start()
# get new objects
foo_obj = get_foo_with_manager(manager)
bar_obj = getBar()
# start new process
for _ in range(3):
print()
child_p = Process(target=init_child, args=(foo_obj, bar_obj))
child_p.start()
# Wait for process to join
child_p.join()
foo.py
import os
print('Foo Loaded by PID: ' + str(os.getpid()))
class Foo:
def __init__(self):
pass
def get_foo_with_manager(manager):
return manager.Foo()
输出
Foo Loaded by PID: 38332
Bar Loaded by PID: 38332
Foo Loaded by PID: 56632
Worker Process loaded in PID: 63404
Worker Process loaded in PID: 66400
Worker Process loaded in PID: 56044
您可以看到,在manager
创建的服务器过程中仅实例化foo_obj
。之后,foo.py
再也没有进口。
经理的基准
i通过比较使用NamespaceProxy
获取属性值的速度来测试代理与非托管对象的速度相比的速度:
import time
from multiprocessing.managers import BaseManager, NamespaceProxy
class TimeNamespaceProxy(NamespaceProxy):
def __getattr__(self, key):
t = time.time()
super().__getattr__(key)
return time.time() - t
class A:
def __init__(self):
self.num = 3
def worker(proxy_a):
times = []
for _ in range(10000):
times.append(proxy_a.num)
print(
f'Time taken for 10000 calls to get attribute with proxy is : {sum(times)}, with the avg being: {sum(times) / len(times)}')
if __name__ == "__main__":
BaseManager.register('A', A, TimeNamespaceProxy, exposed=tuple(dir(TimeNamespaceProxy)))
manager = BaseManager()
manager.start()
num = 10000
a = A()
proxy_a = manager.A()
times = []
t = time.perf_counter()
for _ in range(num):
a.num
times.append(time.perf_counter() - t)
print(f'Time taken for {num} calls to get attribute without proxy is : {sum(times)}, with the avg being: {sum(times) / num}')
times = []
for _ in range(10000):
times.append(proxy_a.num)
print(
f'Time taken for {num} calls to get attribute with proxy is : {sum(times)}, with the avg being: {sum(times) / num}')
输出
Time taken for 10000 calls to get attribute without proxy is : 0.0011279000000000705, with the avg being: 1.1279000000000705e-07
Time taken for 10000 calls to get attribute with proxy is : 1.751499891281128, with the avg being: 0.0001751499891281128
您可以看到,不使用代理的速度超过1000倍,但是,绝对术语中,使用代理仅需大约0.2 ms即可腌制/取消函数名称 __getattribute__
和参数,然后outckle/outckle/nuckle the the the结果。对于大多数应用程序,仅将功能名称发送到管理员流程将不是瓶颈。只有在要发送的返回值/参数很复杂并且需要更长的时间时,事情才会变得棘手,但是在使用多处理时,这些是常见的恶作剧(我建议您在可能的情况下将重复的参数存储在托管对象本身中,以便在可能的情况下减少开销(。
简而言之,实施过程安全将始终在性能方面取决于权衡。如果您的用例需要特别性能的代码,那么您可能需要重新评估您对语言的选择,因为Python