如何避免使用Python多处理模块的双重进口



我正在尝试使用导入的模块实例化对象。为了使这些导入过程安全(因为我在Windows上(,我正在使用if __name__ == '__main__':块内的import语句。

我的文件看起来像这样:

main.py

# main.py
from multiprocessing import Process
# target func for new process
def init_child(foo_obj, bar_obj):
  pass
if __name__ == "__main__":
  # protect imports from child process
  from foo import getFoo
  from bar import getBar
  # get new objects
  foo_obj = getFoo()
  bar_obj = getBar()
  # start new process
  child_p = Process(target=init_child, args=(foo_obj, bar_obj))
  child_p.start()
  # Wait for process to join
  child_p.join()

foo.py

# foo.py
import os
print 'Foo Loaded by PID: ' + str(os.getpid())
class Foo:
  def __init__(self):
    pass
def getFoo():
  # returning new instance of class
  return Foo()

bar.py

# bar.py
import os
print 'Bar Loaded by PID: ' + str(os.getpid())
class Bar:
  def __init__(self):
    pass
def getBar():
  # not returning a new instance
  return 'bar'

输出

Foo Loaded by PID: 58760
Bar Loaded by PID: 58760
Foo Loaded by PID: 29376

我获得的输出表示foo模块已加载两次。我了解解释器再次执行主模块(因为Windows不支持fork系统调用(,但是奇怪的是它是在__main__块内导入的。

共享对象时可能是一个问题;就像从专用模块导入的队列一样。有什么想法会导致这件事?

谢谢!

这是因为在Windows多处理上使用方法spawn来启动新进程。正如您提到的,与fork不同,此方法复制了过程所需的所有内容以在过程本身中启动工作。因此,如果将任何对象作为参数启动以启动一个过程,则使用该对象持有的所有必需数据都使用泡菜序列化,并将其发送到该过程,其中使用该数据重新创建对象/重复对象。

这如何影响您的您的代码?

您使用Process类将Foo类的实例传递给init_child。此后发生的事情是,实例内的数据以及类的全名和路径被腌制并发送到新过程。请记住,腌制的数据不包括类的任何代码或类属性,只是它的名称。然后使用类和实例数据的名称,创建一个新对象,准备使用。

您可能已经注意到,只有在新过程中已经定义了Foo类的情况下,整个过程才能起作用,为此,定义Foo的模块也必须被导入其。多处理会自动为您处理所有这些,这是您案例中歧义的原因。

要使事情变得更清晰,请在下面的代码下运行此操作,该代码也打印了工作过程的PID,并创建了3个工人:

from multiprocessing import Process
import os

# target func for new process
def init_child(foo_obj, bar_obj):
    print(f"Worker Process loaded in PID: {os.getpid()}")
    pass

if __name__ == "__main__":
    # protect imports from child process
    from foo import getFoo
    from bar import getBar
    # get new objects
    foo_obj = getFoo()
    bar_obj = getBar()
for _ in range(3):
    print()
    # start new process
    child_p = Process(target=init_child, args=(foo_obj, bar_obj))
    child_p.start()
    # Wait for process to join
    child_p.join()

输出

    Foo Loaded by PID: 41320
Bar Loaded by PID: 41320
Foo Loaded by PID: 23864
Worker Process loaded in PID: 23864
Foo Loaded by PID: 90256
Worker Process loaded in PID: 90256
Foo Loaded by PID: 123352
Worker Process loaded in PID: 123352

,如您所见,foo.py正在与工人相同的过程中导入以重新创建实例。

那么,什么是解决方案

您可以使用经理。这些产生了一个服务器进程,其中创建和存储了托管对象,从而允许其他进程访问其共享状态而无需创建重复。在您的情况下,foo.py仍将导入一个额外的时间来在服务器进程中创建实例。但是,在完成此操作之后,您可以自由地将实例传递给其他过程,而不必担心在该过程中导入的模块foo.py

main.py

from multiprocessing import Process
from multiprocessing.managers import BaseManager
import os

# target func for new process
def init_child(foo_obj, bar_obj):
    print(f"Worker Process loaded in PID: {os.getpid()}")
    pass

if __name__ == "__main__":
    # protect imports from child process
    import ultra_random_test
    from foo import get_foo_with_manager, Foo
    from bar import getBar
    BaseManager.register('Foo', Foo)
    manager = BaseManager()
    manager.start()
    # get new objects
    foo_obj = get_foo_with_manager(manager)
    bar_obj = getBar()
    # start new process
    for _ in range(3):
        print()
        child_p = Process(target=init_child, args=(foo_obj, bar_obj))
        child_p.start()
        # Wait for process to join
        child_p.join()

foo.py

import os
print('Foo Loaded by PID: ' + str(os.getpid()))

class Foo:
    def __init__(self):
        pass

def get_foo_with_manager(manager):
    return manager.Foo()

输出

Foo Loaded by PID: 38332
Bar Loaded by PID: 38332
Foo Loaded by PID: 56632
Worker Process loaded in PID: 63404
Worker Process loaded in PID: 66400
Worker Process loaded in PID: 56044

您可以看到,在manager创建的服务器过程中仅实例化foo_obj。之后,foo.py再也没有进口。

经理的基准

i通过比较使用NamespaceProxy获取属性值的速度来测试代理与非托管对象的速度相比的速度:

import time
from multiprocessing.managers import BaseManager, NamespaceProxy

class TimeNamespaceProxy(NamespaceProxy):
    def __getattr__(self, key):
        t = time.time()
        super().__getattr__(key)
        return time.time() - t

class A:
    def __init__(self):
        self.num = 3

def worker(proxy_a):
    times = []
    for _ in range(10000):
        times.append(proxy_a.num)
    print(
        f'Time taken for 10000 calls to get attribute with proxy is : {sum(times)}, with the avg being: {sum(times) / len(times)}')

if __name__ == "__main__":
    BaseManager.register('A', A, TimeNamespaceProxy, exposed=tuple(dir(TimeNamespaceProxy)))
    manager = BaseManager()
    manager.start()
    num = 10000
    a = A()
    proxy_a = manager.A()
    times = []
    t = time.perf_counter()
    for _ in range(num):
        a.num
    times.append(time.perf_counter() - t)
    print(f'Time taken for {num} calls to get attribute without proxy is : {sum(times)}, with the avg being: {sum(times) / num}')
    times = []
    for _ in range(10000):
        times.append(proxy_a.num)
    print(
        f'Time taken for {num} calls to get attribute with proxy is : {sum(times)}, with the avg being: {sum(times) / num}')

输出

Time taken for 10000 calls to get attribute without proxy is : 0.0011279000000000705, with the avg being: 1.1279000000000705e-07
Time taken for 10000 calls to get attribute with proxy is : 1.751499891281128, with the avg being: 0.0001751499891281128

您可以看到,不使用代理的速度超过1000倍,但是,绝对术语中,使用代理仅需大约0.2 ms即可腌制/取消函数名称 __getattribute__和参数,然后outckle/outckle/nuckle the the the结果。对于大多数应用程序,仅将功能名称发送到管理员流程将不是瓶颈。只有在要发送的返回值/参数很复杂并且需要更长的时间时,事情才会变得棘手,但是在使用多处理时,这些是常见的恶作剧(我建议您在可能的情况下将重复的参数存储在托管对象本身中,以便在可能的情况下减少开销(。

简而言之,实施过程安全将始终在性能方面取决于权衡。如果您的用例需要特别性能的代码,那么您可能需要重新评估您对语言的选择,因为Python

的权衡特别大

相关内容

  • 没有找到相关文章

最新更新