我想通过python多处理将数据写入同一列表,我通过mp.manager.list进行进程间数据共享。代码如下所示,这只是一个演示,我想将相同的数字添加到同一列表中。但是,计数器可以增加,但grp保持不变。问题出在哪里?
import multiprocessing as mp
import random
import time
import numpy as np
class A:
def __init__(self):
self.raw = [random.randint(1, 4) for _ in range(100)]
self.manager = mp.Manager()
self.grp = self.manager.list([[1], [2], [3], [4]])
self.use_cpu_num = 2
self.counter = self.manager.Value('i', 0)
def run(self):
subsets = np.array_split(self.raw, self.use_cpu_num)
subsets = [each.tolist() for each in subsets]
process = []
for i in range(self.use_cpu_num):
process.append(mp.Process(target=self.process, args=(subsets[i], )))
for each in process:
each.start()
for each in process:
each.join()
each.close()
print(self.grp)
def process(self, subset):
for each in subset:
for i in range(len(self.grp)):
each_grp = self.grp[i]
if each in each_grp:
self.counter.set(self.counter.value + 1)
self.grp[i].append(each)
print(self.counter.value)
if __name__ == '__main__':
a = A()
a.run()
我尝试过使用mp.Lock(),但它不会在不同的进程之间共享数据。
这样说,self.grp
是一个托管对象,使用self.grp.append
或self.grp[i] = x
对其进行的任何更改都将转移到管理器进程。
self.grp
中的对象不受管理,对它们的任何更改都将不会传输到管理器,当您使用self.grp[i]
时,您只能获得它们的副本。
为了允许对self.grp
中的列表进行修改,这些列表本身必须是manager.list
对象,并且3.6 以下的python版本不支持嵌套托管对象
self.grp = self.manager.list([self.manager.list(x) for x in ([1], [2], [3], [4])])
如果你只存储数字,你可以传递multiprocessing.Array
,为了方便起见,它可以包装成numpy ndarray,但你不能附加到它上,而且必须事先知道大小。
编辑:在windows上,当尝试pickleself.manager
对象时,您会遇到一个错误,所以我在下面的例子中从类中修改了它。
import multiprocessing as mp
import random
import time
import numpy as np
class A:
def __init__(self):
self.raw = [random.randint(1, 4) for _ in range(100)]
self.grp = manager.list([manager.list(x) for x in ([1], [2], [3], [4])])
self.use_cpu_num = 2
self.counter = manager.Value('i', 0)
def run(self):
subsets = np.array_split(self.raw, self.use_cpu_num)
subsets = [each.tolist() for each in subsets]
process = []
for i in range(self.use_cpu_num):
process.append(mp.Process(target=self.process, args=(subsets[i], )))
for each in process:
each.start()
for each in process:
each.join()
each.close()
print([list(x) for x in self.grp])
def process(self, subset):
for each in subset:
for i in range(len(self.grp)):
each_grp = self.grp[i]
if each in each_grp:
self.counter.set(self.counter.value + 1)
self.grp[i].append(each)
print(self.counter.value)
if __name__ == '__main__':
manager = mp.Manager()
a = A()
a.run()