我正在测试一些python代码在两个进程之间共享numpy数组。
from multiprocessing import Process, Semaphore, shared_memory
import numpy as np
import time
def reader(id, a, shm):
exst_shm = shared_memory.SharedMemory(name=shm)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=exst_shm.buf)
time.sleep(2)
print('FUNCTION VERSION: ', b[0])
def worker(id, a, shm):
exst_shm = shared_memory.SharedMemory(name=shm)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=exst_shm.buf)
b[0] += 10
if __name__ == "__main__":
a = np.array([0])
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
c = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
th1 = Process(target=reader, args=(1, a, shm.name))
th2 = Process(target=worker, args=(2, a, shm.name))
th1.start()
th2.start()
th1.join()
th2.join()
上面的代码工作得很好,它打印出:
from multiprocessing import Process, Semaphore, shared_memory
import numpy as np
import time
class Reader(Process):
def __init__(self, id, a, shm):
Process.__init__(self)
exst_shm = shared_memory.SharedMemory(name=shm)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=exst_shm.buf)
time.sleep(2)
print('SUBCLASS VERSION: ', b[0])
class Worker(Process):
def __init__(self, id, a, shm):
Process.__init__(self)
exst_shm = shared_memory.SharedMemory(name=shm)
b = np.ndarray(a.shape, dtype=a.dtype, buffer=exst_shm.buf)
b[0] += 10
if __name__ == "__main__":
a = np.array([0])
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
c = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
th1 = Reader(1, a, shm.name)
th2 = Worker(2, a, shm.name)
th1.start()
th2.start()
th1.join()
th2.join()
然而,当它被写成类时,它打印出:SUBCLASS VERSION: 0。差异从何而来?代码有什么问题?
您以错误的方式创建类。
正常情况下,start()
运行run()
方法在新进程中执行target
。
当你创建自己的类,然后你必须覆盖方法run()
和把你的代码在run()
,然后它会运行这段代码,当你使用start()
。
在你的版本中,它在主进程中运行所有代码,而不是在新进程中运行。首先,您创建Reader()
,它休眠2s并打印结果b[0]
,然后您创建Worker()
,它添加b[0] += 10
。然后是start()
进程——但是它们没有什么关系,因为你没有覆盖run()
,而且你在类中没有target
。
from multiprocessing import Process, Semaphore, shared_memory
import numpy as np
import time
class Reader(Process):
def __init__(self, id, a, shm):
Process.__init__(self)
self.exst_shm = shared_memory.SharedMemory(name=shm)
self.b = np.ndarray(a.shape, dtype=a.dtype, buffer=self.exst_shm.buf)
def run(self):
time.sleep(2)
print('SUBCLASS VERSION: ', self.b[0])
class Worker(Process):
def __init__(self, id, a, shm):
Process.__init__(self)
self.exst_shm = shared_memory.SharedMemory(name=shm)
self.b = np.ndarray(a.shape, dtype=a.dtype, buffer=self.exst_shm.buf)
def run(self):
self.b[0] += 10
if __name__ == "__main__":
a = np.array([0])
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
c = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
th1 = Reader(1, a, shm.name)
th2 = Worker(2, a, shm.name)
th1.start()
th2.start()
th1.join()
th2.join()
顺便说一句:
使用
import multiprocessing
print(multiprocessing.__file__)
你可以在process.py
中找到源代码并看到它是如何工作的。
def run(self):
'''
Method to be run in sub-process; can be overridden in sub-class
'''
if self._target:
self._target(*self._args, **self._kwargs)