我处理2个大型1D数组(比如A和B(:我对A和B元素对进行运算,并将结果写入共享数组C(将C视为直方图(。我想使用多处理来并行化进程。我认为最佳的方法可以是将数组A分割成多个唯一的块,这些块的数量等于我选择执行的并行进程的数量,并使用for循环对B的所有元素进行计算。
我读了许多问题/答案。我以python中写入数组的函数的多处理循环为例,使用Process。我试图适应我的问题,但我得到了串行执行的性能。我正在测试的代码:
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Array
import numpy as np
import time
def ProcessData(sub_data1, data2, freq):
for dat1 in sub_data1:
for dat2 in data2:
d = int( np.sqrt( (dat1 - dat2)**2 ) )
#d = int(dat1 - dat2)
if (d < len(freq)):
freq[d] += 1
def SplitList(data, n):
sub_len = divmod(len(data),n)[0]
print(sub_len)
slices = []
for i in range(n):
slices.append( data[i*sub_len:i*sub_len+sub_len] )
return slices
def main(nproc):
print("Number of cpu : ", mp.cpu_count())
lock = Lock()
N = 30
chip = [1,1,1,1,1,2,2,2,2,2,3,3,3,3,4,4,4,4,4,5,5,5,5,5,6,6,6,6,6,7,7,7,7,7,8,8,8,8,8,9,9,9,9]
data1 = np.array( chip * N )
data2 = np.array( chip * N )
freq = Array('i', 100, lock=lock)
dat1_subs = SplitList(data1,nproc)
print('Number of data1 slices {:d}'.format(len(dat1_subs)))
t_start = time.time()
if __name__ == '__main__':
for i in range(0, nproc):
print('LEN {:d}: {:d}'.format(i, len(dat1_subs[i] )) )
p = Process(target=ProcessData, args=(dat1_subs[i], data2, freq))
p.start()
p.join()
t_end = time.time()
print('Total time (s)= ' + str(t_end - t_start))
print(str(list(freq)))
#new_array = np.frombuffer(freq.get_obj())
Sum = sum( list(freq) )
print('Total {:d}'.format(Sum))
NProc = 4
main(NProc)
如果有任何意见或暗示我做错了什么,我将不胜感激。或者可能还有更简单的方法,我只是不知道。谢谢
像这样尝试
from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
# Let the executor divide the work among processes by using 'map'.
with ProcessPoolExecutor(max_workers=nprocs) as executor:
return {num:factors for num, factors inmething like this
executor.map(factorize_naive, nums))}
来自您的评论:
一旦用不同数量的cpu处理,共享数组(累积计数的"req"(在某些绑定中的计数可能略有不同,这是什么原因?
如果即使不更改CPU数量,每次运行都没有得到不同的结果,我会感到惊讶。如果你真的改变了它们;明显的";第一个原因是SplitLists()
可以丢弃不同数量的尾随数据,这取决于传递给它的n
但即使你不改变进程的数量,
freq[d] += 1
不是确定性的。是的,Array
类型是跨进程同步的,但这仅适用于加载和存储本身。这种操作是复杂的,在掩护下的工作就像
with lock:
temp = freq[d]
temp += 1
with lock:
freq[d] = temp
没有什么可以阻止多个进程读取相同的当前freq[d]
值,每个进程自己加1,然后多次存储相同的新值。
为了使增量作为一个整体成为原子,您需要传递一个不同的锁(在这种情况下,您还可以使用RawArray
(,然后执行:
with lock:
freq[d] += 1
但是,对该锁的激烈竞争将扼杀性能。如果没有"太多";可能的d
值,传递一个len(freq)
不同锁对象的数组会更好(就速度而言(,并执行:
with locks[d]:
freq[d] += 1
然后,只有当多个进程碰巧更改相同的d
计数时,才会发生锁争用。
但如果len(freq)
不是那么大,我会完全避免共享内存。让每个进程使用自己的freq
列表全速运行,并让主程序对它们进行汇总。这里有一个例子,但它与您发布的代码完全不同。没有numpy(这与这里的问题无关(,也没有你从链接到的帖子中继承的任何奇怪的东西:
def work(raw, freqlen):
freq = [0] * freqlen
for x in raw:
if x < freqlen:
freq[x] += 1
return freq
def main(nproc, nfreq, numperchunk=100000):
import multiprocessing as mp
base = list(range(200)) * 1000000
with mp.Pool(processes=nproc) as pool:
i = 0
ps = []
while i < len(base):
ps.append(pool.apply_async(work,
(base[i : i + numperchunk],
nfreq)))
i += numperchunk
result = [0] * nfreq
for p in ps:
for i, x in enumerate(p.get()):
result[i] += x
print(result)
if __name__ == "__main__":
main(4, 10)
共享内存实际上可能有帮助的地方:分割一个巨大的向量并将切片传递给工作进程是昂贵的(大量的进程间通信(。最好将只读的巨型向量放在共享内存中,只将切片索引传递给工作线程。或者,在Linux-y系统上,让工作人员通过写时复制fork()
语义在模块级别继承巨型向量。
权衡
为了使其中一些想法具体化,这里有一个更像你原来的变体。但是:
- 它把";巨型矢量";共享内存中
- 由于向量是共享的,所以只需要将切片边界传递给工作者。所需的进程间通信要少得多
freq
矢量也是共享的,但是工作者首先使用自己的本地版本,以获得峰值无锁定速度。它只在最后持有一个锁,将其本地结果折叠到共享结果中- 因为我们正在进行自己的锁定,所以使用了
RawArray
。获取这些信息的速度要快得多
这尖叫起来。事实上,该程序创建测试用例的时间远远多于计算直方图的时间;-(
def work(base, lo, hi, freq, L):
freqlen = len(freq)
myfreq = [0] * freqlen
for i in range(lo, min(hi, len(base))):
x = base[i]
if x < freqlen:
myfreq[x] += 1
with L:
for i, x in enumerate(myfreq):
freq[i] += x
def main(nproc, nfreq):
import multiprocessing as mp
import math
base = mp.RawArray('h', list(range(201)) * 1000003)
freq = mp.RawArray('i', nfreq)
L = mp.Lock()
numperchunk = math.ceil(len(base) / nproc)
print(f"{len(base)=:,} {numperchunk=:,}")
ps = []
a = 0
for i in range(nproc):
p = mp.Process(target=work,
args=(base, a, a + numperchunk, freq, L))
p.start()
a += numperchunk
ps.append(p)
for p in ps:
p.join()
print(list(freq))
if __name__ == "__main__":
main(4, 10)