在python的不同进程之间共享大对象的最快方法



>Supose 我有 3 个不同的进程,它们在永久循环中执行不同的逻辑。我想并行运行所有这些进程,而每个进程都可以访问一个shared_object,这是一个类的重对象。所以我尝试使用带有马槽的multiprocessing来存档它,如下所示:

import multiprocessing
import inspect
from multiprocessing.managers import BaseManager, NamespaceProxy
import time
import random
class SharedObject():
def __init__(self):
self.a = 1
def show_a(self):
print(self.a)

class ProcessManager(BaseManager):
pass

class ProxyBase(NamespaceProxy):
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__')

class ManagerProxy(ProxyBase):
pass

def register_proxy(name, cls, proxy):
for attr in dir(cls):
if callable(getattr(cls, attr)) and not attr.startswith("__"):
proxy._exposed_ += (attr,)
setattr(proxy, attr,
lambda s: object.__getattribute__(s, '_callmethod')(attr))
ProcessManager.register(name, cls, proxy)

register_proxy('shared_object', SharedObject, ManagerProxy)
process_manager = ProcessManager()
process_manager.start()
shared_object = process_manager.shared_object()
def process_1():
while True:
print('Process 1 see {}'.format(shared_object.a))
shared_object.a = 1
time.sleep(1)
def process_2():
while True:
print('Process 2 see {}'.format(shared_object.a))
shared_object.a = 2
time.sleep(1)
def process_3():
while True:
print('Process 3 see {}'.format(shared_object.a))
shared_object.a = 3
if random.randint(0,1) == 1:
shared_object.show_a()
time.sleep(1)
first_process = multiprocessing.Process(name="First process", target=process_1)
first_process.start()
second_process = multiprocessing.Process(name="Second process", target=process_2)
second_process.start()
third_process = multiprocessing.Process(name="Third process", target=process_3)
third_process.start()
shared_object.show_a()
while True:
time.sleep(10)

它可以工作,但对我来说太慢了,因为我必须传递大的 numpy 数组。有没有其他方法可以使其更快(实时速度(?多谢

看起来这是multiprocessing.shared_memory解决的问题,但是a(看起来只有python 3.8+和b(代码需要重组,至少:

  • 分配正确的大小
  • 将共享对象的名称传递给进程
  • 并记得在最后关闭它

编辑:

由于我无法让它与python 3.7一起使用,我决定将其与3.5+,Array(和Value,它可能就是您需要的(中的共享内存原语一起使用。以下代码运行愉快:

import time
import random
from multiprocessing import Process, Array

s1 = Array('i', [1])
def process_1():
while True:
print('Process 1 see {}'.format(s1[0]))
s1[0] = 1
time.sleep(1)
def process_2():
while True:
print('Process 2 see {}'.format(s1[0]))
s1[0] = 2
time.sleep(1)
def process_3():
while True:
print('Process 3 see {}'.format(s1[0]))
s1[0] = 3
if random.randint(0,1) == 1:
print(s1[0])
time.sleep(1)
first_process = Process(name="First process", target=process_1)
first_process.start()
second_process = Process(name="Second process", target=process_2)
second_process.start()
third_process = Process(name="Third process", target=process_3)
third_process.start()

while True:
time.sleep(10)

获得

Process 1 see 1
Process 2 see 1
Process 3 see 1
Process 1 see 3
Process 2 see 1
Process 3 see 2
3
Process 1 see 3
Process 2 see 1
Process 3 see 2
3
[...]

我仍然会将数组传递给进程,如下所示:

def process_1(shared):
...

然后

Process(name="First process", args=(s1), target=process_1)

不过,为了更清楚地说明每个过程正在做什么。

另外,由于我没有尝试过 BIG 对象,所以我不太确定它会如何......

相关内容

  • 没有找到相关文章

最新更新