Python多处理(将数据拆分为更小的块-多个函数参数)



21年2月22日的注释:-我的问题也可能通过更有效的内存使用而不是多处理来解决,因为我意识到内存负载会变得非常高,这可能是一个限制因素。


我正试图通过使用多处理来减少脚本运行所需的时间。过去,我得到了一些关于提高函数本身速度的好技巧(提高np.where()循环的性能),但现在我想使用32核工作站的所有核心。我的函数将两个列表(X和Y)的条目与引用列表Q和Z进行比较。对于X/Y中的每个元素,它检查X[i]是否出现在Q中的某个地方以及Y[i]是否发生在Z中;s";。(注意:我的真实数据由DNA测序读数组成,我需要将读数映射到参考。)

到目前为止我尝试了什么:

  • 将我的长列表X和Y拆分为偶数块(n-chunks,其中n==cpu_count)
  • 尝试">concurrent.forets.ProcessPoolExecutor()"为每个";子列表";并行地,并最终将每个过程的结果组合到一个最终字典(matchdict)中。(-->请参阅已注释部分)

我的问题:

  • 当我取消对多处理部分的注释时,所有核心都被使用了,但它最终出现了一个错误(索引超出范围),我还无法解决。(->提示:将N降低到1000,您将立即看到错误,而无需等待太久)

有人知道如何解决这个问题,或者可以建议在我的代码中使用多处理的更好方法吗?

这是代码:

import numpy as np
import multiprocessing
import concurrent.futures
np.random.seed(1)
def matchdictfunc(index,x,y,q,z):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
lookup = {}
for i, (q, z) in enumerate(zip(Q, Z)):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match

return matchdict
def split(a, n):  # function to split list in n even parts
k, m = divmod(len(a), n)
return list((a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)))
def splitinput(index,X,Y,Q,Z):  # split large lists X and Y in n-even parts (n = cpu_count), make new list containing n-times Q and Z (to feed Q and Z for every process)
cpu_count = multiprocessing.cpu_count()
#create multiple chunks for X and Y and index:
index_split = split(index,cpu_count)
X_split = split(X,cpu_count)
Y_split = split(Y,cpu_count)
# create list with several times Q and Z since it needs to be same length as X_split etc:
Q_mult = []  
Z_mult = []
for _ in range(cpu_count):
Q_mult.append(Q)
Z_mult.append(Z)
return index_split,X_split,Y_split,Q_mult,Z_mult
# N will finally scale up to 10^9
N = 10000000
M = 300
index = [str(x) for x in list(range(N))]
X = np.random.randint(M, size=N)
Y = np.random.randint(M, size=N)
# Q and Z size is fixed at 120000
Q = np.random.randint(M, size=120000)
Z = np.random.randint(M, size=120000)
# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
X = np.char.mod('%d', X).tolist()
Y = np.char.mod('%d', Y).tolist()
Q = np.char.mod('%d', Q).tolist()
Z = np.char.mod('%d', Z).tolist()
# single-core:
matchdict = matchdictfunc(index,X,Y,Q,Z)
# split lists to number of processors (cpu_count)
index_split,X_split,Y_split,Q_mult,Z_mult = splitinput(index,X,Y,Q,Z)  
## Multiprocessing attempt - FAILS! (index out of range)
# finallist = []
# if __name__ == '__main__':
#     with concurrent.futures.ProcessPoolExecutor() as executor:
#         results = executor.map(matchlistfunc,X_split,Y_split,Q_mult,Z_mult)
#         for result in results:
#             finallist.append(result)

#         matchdict = {}
#         for d in finallist:
#             matchdict.update(d)

函数matchdictfunc当前有参数xyqz,但实际上没有使用它们,尽管在多处理版本中需要使用两个参数。函数splitinput也不需要将QZ复制为返回值Q_splitZ_split。目前,matchdictfunc希望QZ是全局变量,我们可以在构建池时使用initializerinitargs参数来安排多处理版本中的情况。您还应该将不需要由子进程执行的代码移动到由if __name__ == '__main__':控制的块中,例如arary初始化代码。这些变化导致:

import numpy as np
import multiprocessing
import concurrent.futures
MULTIPROCESSING = True
def init_pool(q, z):
global Q, Z
Q = q
Z = z
def matchdictfunc(index, X, Y):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
lookup = {}
for i, (q, z) in enumerate(zip(Q, Z)):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def split(a, n):  # function to split list in n even parts
k, m = divmod(len(a), n)
return list((a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)))
def splitinput(index, X, Y):  # split large lists X and Y in n-even parts (n = cpu_count))
cpu_count = multiprocessing.cpu_count()
#create multiple chunks for X and Y and index:
index_split = split(index,cpu_count)
X_split = split(X,cpu_count)
Y_split = split(Y,cpu_count)
return index_split, X_split ,Y_split

def main():
# following required for non-multiprocessing
if not MULTIPROCESSING:
global Q, Z
np.random.seed(1)
# N will finally scale up to 10^9
N = 10000000
M = 300
index = [str(x) for x in list(range(N))]
X = np.random.randint(M, size=N)
Y = np.random.randint(M, size=N)
# Q and Z size is fixed at 120000
Q = np.random.randint(M, size=120000)
Z = np.random.randint(M, size=120000)
# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
X = np.char.mod('%d', X).tolist()
Y = np.char.mod('%d', Y).tolist()
Q = np.char.mod('%d', Q).tolist()
Z = np.char.mod('%d', Z).tolist()
# for non-multiprocessing:
if not MULTIPROCESSING:
matchdict = matchdictfunc(index, X, Y)
else:
# for multiprocessing:
# split lists to number of processors (cpu_count)
index_split, X_split, Y_split = splitinput(index, X, Y)
with concurrent.futures.ProcessPoolExecutor(initializer=init_pool, initargs=(Q, Z)) as executor:
finallist = [result for result in executor.map(matchdictfunc, index_split, X_split, Y_split)]
matchdict = {}
for d in finallist:
matchdict.update(d)
#print(matchdict)
if __name__ == '__main__':
main()

注意:我尝试了N=1000的较小值(打印出matchdict的结果),多处理版本似乎返回了相同的结果。我的机器没有资源在不冻结其他一切的情况下以N的全部值运行。

另一种方法

我的工作假设您的DNA数据是外部的,XY值可以一次读取n,也可以读取和写入,因此这是可能的然后,我建议一次读取n值,从而将其分解为大约N/n块,而不是将所有数据都驻留在内存中并将其拆分为32块。

在下面的代码中,我已经从类multiprocessing.pool.Pool切换到使用imap方法。优点是它延迟地向进程池提交任务,也就是说,iterable参数不必是列表或可转换为列表。相反,池将迭代可迭代,将任务发送到chunksize组中的池。在下面的代码中,我使用了一个生成器函数作为imap的参数,它将生成连续的XY值。您的实际生成器函数将首先打开DNA文件并读取文件的连续部分。

import numpy as np
import multiprocessing
def init_pool(q, z):
global Q, Z
Q = q
Z = z
def matchdictfunc(t):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
index, X, Y = t
lookup = {}
for i, (q, z) in enumerate(zip(Q, Z)):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict

def next_tuple(n, stop, M):
start = 0
while True:
end = min(start + n, stop)
index = [str(x) for x in list(range(start, end))]
x = np.random.randint(M, size=n)
y = np.random.randint(M, size=n)
# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
x = np.char.mod('%d', x).tolist()
y = np.char.mod('%d', y).tolist()
yield (index, x, y)
start = end
if start >= stop:
break
def compute_chunksize(XY_AT_A_TIME, N):
n_tasks, remainder = divmod(N, XY_AT_A_TIME)
if remainder:
n_tasks += 1
chunksize, remainder = divmod(n_tasks, multiprocessing.cpu_count() * 4)
if remainder:
chunksize += 1
return chunksize

def main():
np.random.seed(1)
# N will finally scale up to 10^9
N = 10000000
M = 300
# Q and Z size is fixed at 120000
Q = np.random.randint(M, size=120000)
Z = np.random.randint(M, size=120000)
# convert int32 arrays to str64 arrays and then to list, to represent original data (which are strings and not numbers)
Q = np.char.mod('%d', Q).tolist()
Z = np.char.mod('%d', Z).tolist()
matchdict = {}
# number of X, Y pairs at a time:
# experiment with this, especially as N increases:
XY_AT_A_TIME = 10000
chunksize = compute_chunksize(XY_AT_A_TIME, N)
#print('chunksize =', chunksize) # 32 with 8 cores
with multiprocessing.Pool(initializer=init_pool, initargs=(Q, Z)) as pool:
for d in pool.imap(matchdictfunc, next_tuple(XY_AT_A_TIME, N, M), chunksize):
matchdict.update(d)
#print(matchdict)
if __name__ == '__main__':
import time
t = time.time()
main()
print('total time =', time.time() - t)

更新

我想从基准测试中取消使用numpy。众所周知,numpy在某些操作中使用多处理,当在多处理应用程序中使用时,可能会导致性能下降。所以我做的第一件事就是把OP的原始程序和代码放在哪里,例如:

import numpy as np
np.random.seed(1)
X = np.random.randint(M, size=N)
X = np.char.mod('%d', X).tolist()

我将其替换为:

import random
random.seed(1)
X = [str(random.randrange(M)) for _ in range(N)]

然后,我对OP的程序进行计时,以获得生成XYQZ列表的时间以及总时间。在我的桌面上,时间分别约为20秒和37秒!因此,在我的多处理版本中,仅为进程池的进程生成参数就占总运行时间的一半以上。我还发现,对于第二种方法,当我增加XY_AT_A_TIME的值时,CPU利用率从100%下降到50%左右,但总运行时间有所改善。我还没弄清楚为什么会这样。

接下来,我试图模拟程序在读取数据时的工作方式。因此,我将2*N随机整数写入文件temp.txt,并修改OP的程序以从文件中初始化XY,然后修改第二种方法的next_tuple函数如下:

def next_tuple(n, stop, M):
with open('temp.txt') as f:
start = 0
while True:
end = min(start + n, stop)
index = [str(x) for x in range(start, end)] # improvement
x = [f.readline().strip() for _ in range(n)]
y = [f.readline().strip() for _ in range(n)]
yield (index, x, y)
start = end
if start >= stop:
break

再次,当我增加XY_AT_A_TIME时,CPU利用率下降了(我发现的最佳性能是400000,而CPU利用率只有40%左右)。

我终于重写了我的第一种方法,试图提高内存效率(见下文)。此更新版本再次从文件中读取随机数,但对XYindex使用生成器函数,因此我不需要用于完整列表和拆分的内存。同样,我不希望多处理和非多处理版本的结果重复,因为我在这两种情况下分配XY值的方式(一个简单的解决方案是将随机数写入X值文件和Y值文件,并从这两个文件中读取回值)。但这对运行时间没有影响。但是,尽管使用了默认的8池大小,CPU利用率仍然只有30-40%(波动很大),并且总体运行时间几乎是非多处理运行时间的两倍。但为什么呢?

import random
import multiprocessing
import concurrent.futures
import time
MULTIPROCESSING = True
POOL_SIZE = multiprocessing.cpu_count()
def init_pool(q, z):
global Q, Z
Q = q
Z = z
def matchdictfunc(index, X, Y):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
lookup = {}
for i, (q, z) in enumerate(zip(Q, Z)):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def split(a):  # function to split list in POOL_SIZE even parts
k, m = divmod(N, POOL_SIZE)
divisions = [(i + 1) * k + min(i + 1, m) - (i * k + min(i, m)) for i in range(POOL_SIZE)]
parts = []
for division in divisions:
part = [next(a) for _ in range(division)]
parts.append(part)
return parts
def splitinput(index, X, Y):  # split large lists X and Y in n-even parts (n = POOL_SIZE)
#create multiple chunks for X and Y and index:
index_split = split(index)
X_split = split(X)
Y_split = split(Y)
return index_split, X_split ,Y_split

def main():
global N
# following required for non-multiprocessing
if not MULTIPROCESSING:
global Q, Z
random.seed(1)
# N will finally scale up to 10^9
N = 10000000
M = 300
# Q and Z size is fixed at 120000
Q = [str(random.randrange(M)) for _ in range(120000)]
Z = [str(random.randrange(M)) for _ in range(120000)]
with open('temp.txt') as f:
# for non-multiprocessing:
if not MULTIPROCESSING:
index = [str(x) for x in range(N)]
X = [f.readline().strip() for _ in range(N)]
Y = [f.readline().strip() for _ in range(N)]
matchdict = matchdictfunc(index, X, Y)
else:
# for multiprocessing:
# split lists to number of processors (POOL_SIZE)
# generator functions:
index = (str(x) for x in range(N))
X = (f.readline().strip() for _ in range(N))
Y = (f.readline().strip() for _ in range(N))
index_split, X_split, Y_split = splitinput(index, X, Y)
with concurrent.futures.ProcessPoolExecutor(POOL_SIZE, initializer=init_pool, initargs=(Q, Z)) as executor:
finallist = [result for result in executor.map(matchdictfunc, index_split, X_split, Y_split)]
matchdict = {}
for d in finallist:
matchdict.update(d)

if __name__ == '__main__':
t = time.time()
main()
print('total time =', time.time() - t)

分辨率

将数据从主流程传输到子流程的开销(包括共享内存的读取和写入)会减缓一切吗?因此,这个最终版本试图消除经济放缓的潜在原因。我的桌面上有8个处理器。对于第一种方法,将N=10000000个XY值划分在它们之间意味着每个进程应该处理N // 8 -> 1250000值。因此,我将随机数写在16组1250000个数字中(X为8组,Y为8组),作为一个二进制文件,使用以下代码记录这16组中每一组的偏移量和长度:

import random
random.seed(1)
with open('temp.txt', 'wb') as f:
offsets = []
for i in range(16):
n = [str(random.randrange(300)) for _ in range(1250000)]
b = ','.join(n).encode('ascii')
l = len(b)
offsets.append((f.tell(), l))
f.write(b)
print(offsets)

然后,我根据需要构建了工作函数matchdictfunc可以用来读取值XY本身的列表X_SPECSY_SPECS。现在,我们不是一次向这个工作者函数传递1250000个值,而是传递索引0、1。。。7到worker函数,这样它就知道必须读入哪个组。在访问XY时,共享内存访问已经完全消除(QZ仍然需要共享内存访问),磁盘访问转移到进程池。当然,CPU利用率不会是100%,因为辅助函数正在执行I/O。但我发现,虽然运行时间现在已经大大提高,但与最初的非多处理版本相比,它仍然没有任何改进

OP's original program modified to read `X` and `Y` values in from file: 26.2 seconds
Multiprocessing elapsed time: 29.2 seconds

事实上,当我将ProcessPoolExecutor替换为ThreadPoolExecutor,将代码更改为使用多线程时,基于帮助的时间又下降了近一秒,这表明在工作函数中对全局解释器锁几乎没有争议,即大部分时间都花在了C语言代码中。主要工作由完成

matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]

当我们使用多处理时,我们有多个列表理解和多个zip操作(在较小的列表上)由单独的进程执行,然后我们最终汇编结果。这是我的猜测,但采取已经很有效的操作并在多个处理器之间缩小它们的规模,可能不会带来任何性能提升。或者换句话说,我被难住了,这是我最好的猜测。

最终版本(带有一些额外的优化——请注意):

import random
import concurrent.futures
import time
POOL_SIZE = 8
X_SPECS = [(0, 4541088), (4541088, 4541824), (9082912, 4540691), (13623603, 4541385), (18164988, 4541459), (22706447, 4542961), (27249408, 4541847), (31791255, 4542186)]
Y_SPECS = [(36333441, 4542101), (40875542, 4540120), (45415662, 4540802), (49956464, 4540971), (54497435, 4541427), (59038862, 4541523), (63580385, 4541571), (68121956, 4542335)]
def init_pool(q_z):
global Q_Z
Q_Z = q_z
def matchdictfunc(index, i):  # function to match entries of X and Y to Q and Z and get index of Q/Z where their values match X/Y
x_offset, x_len = X_SPECS[i]
y_offset, y_len = Y_SPECS[i]
with open('temp.txt', 'rb') as f:
f.seek(x_offset, 0)
X = f.read(x_len).decode('ascii').split(',')
f.seek(y_offset, 0)
Y = f.read(y_len).decode('ascii').split(',')
lookup = {}
for i, (q, z) in enumerate(Q_Z):
lookup.setdefault((q, z), []).append(i)
matchlist = [lookup.get((x, y), []) for x, y in zip(X, Y)]
matchdict = {}
for ind, match in enumerate(matchlist):
matchdict[index[ind]] = match
return matchdict
def split(a):  # function to split list in POOL_SIZE even parts
k, m = divmod(N, POOL_SIZE)
divisions = [(i + 1) * k + min(i + 1, m) - (i * k + min(i, m)) for i in range(POOL_SIZE)]
parts = []
for division in divisions:
part = [next(a) for _ in range(division)]
parts.append(part)
return parts

def main():
global N
random.seed(1)
# N will finally scale up to 10^9
N = 10000000
M = 300
# Q and Z size is fixed at 120000
Q = (str(random.randrange(M)) for _ in range(120000))
Z = (str(random.randrange(M)) for _ in range(120000))
Q_Z = list(zip(Q, Z)) # pre-compute the `zip` function
# for multiprocessing:
# split lists to number of processors (POOL_SIZE)
# generator functions:
index = (str(x) for x in range(N))
index_split = split(index)
with concurrent.futures.ProcessPoolExecutor(POOL_SIZE, initializer=init_pool, initargs=(Q_Z,)) as executor:
finallist = executor.map(matchdictfunc, index_split, range(8))
matchdict = {}
for d in finallist:
matchdict.update(d)
print(len(matchdict))
if __name__ == '__main__':
t = time.time()
main()
print('total time =', time.time() - t)

进程间内存传输的成本

在下面的代码中,调用函数CCD_;腌制的";1000000个数字的列表。然后,我两次使用大小为8的多处理池来读取100个文件,并取消拾取这些文件以重新构建原始列表。第一种情况(worker1)和第二种情况(worker2)之间的区别在于,在第二种情形中,列表会返回给调用者(但不会保存,以便可以立即垃圾收集内存)。第二个病例的时间是第一个病例的三倍多。这也可以部分解释为什么当您切换到多处理时没有看到加速。

from multiprocessing import Pool
import pickle
import time
def create_files():
l = [i for i in range(1000000)]
# create 100 identical files:
for file in range(1, 101):
with open(f'pkl/test{file}.pkl', 'wb') as f:
pickle.dump(l, f)

def worker1(file):
file_name = f'pkl/test{file}.pkl'
with open(file_name, 'rb') as f:
obj = pickle.load(f)

def worker2(file):
file_name = f'pkl/test{file}.pkl'
with open(file_name, 'rb') as f:
obj = pickle.load(f)
return file_name, obj
POOLSIZE = 8
if __name__ == '__main__':
#create_files()
pool = Pool(POOLSIZE)
t = time.time()
# no data returned:
for file in range(1, 101):
pool.apply_async(worker1, args=(file,))
pool.close()
pool.join()
print(time.time() - t)
pool = Pool(POOLSIZE)
t = time.time()
for file in range(1, 101):
pool.apply_async(worker2, args=(file,))
pool.close()
pool.join()
print(time.time() - t)
t = time.time()
for file in range(1, 101):
worker2(file)
print(time.time() - t)

最新更新