我正在迭代非常大的文件大小[mesh]
。由于迭代是独立的,我想将网格拆分为较小的尺寸并同时运行它们,以缩短计算时间。下面是一个示例代码。例如,如果mesh
是length=50000
,我想将网格分成 100 个,并同时为每个网格/100 运行乐趣。
import numpy as np
def fnc(data, mesh):
d = []
for i, dummy_val in enumerate(mesh):
d.append(np.sqrt((data[:, 0]-mesh[i, 0])**2.0 + (data[:, 1]-mesh[i, 1])**2.0))
return d
interpolate = fnc(mydata, mymesh)
我想知道使用多处理或多线程来实现这一点,因为我无法将其与循环的执行相协调。
这将为您提供大致的想法。我无法对此进行测试,因为我没有您的数据。ProcessPoolExecutor
的默认构造函数将使用计算机上的处理器数。但是,由于这决定了您可以拥有的多处理级别,因此将N_CHUNKS
参数设置为您可以支持的同时进程数可能会更有效。也就是说,如果您的处理池大小为 6,那么最好将数组分成 6 个大块并让 6 个进程完成工作,而不是将其分解为进程必须等待运行的小块。因此,您可能应该为ProcessPoolExecutor
指定一个不大于您拥有的处理器数的特定max_workers
编号,并将N_CHUNKS设置为相同的值。
from concurrent.futures import ProcessPoolExecutor, as_completed
import numpy as np
def fnc(data, mesh):
d = []
for i, dummy_val in enumerate(mesh):
d.append(np.sqrt((data[:, 0]-mesh[i, 0])**2.0 + (data[:, 1]-mesh[i, 1])**2.0))
return d
def main(data, mesh):
#N_CHUNKS = 100
N_CHUNKS = 6 # assuming you have 6 processors; see max_workers parameter
n = len(mesh)
assert n != 0
if n <= N_CHUNKS:
N_CHUNKS = 1
chunk_size = n
last_chunk_size = n
else:
chunk_size = n // N_CHUNKS
last_chunk_size = n - chunk_size * (N_CHUNKS - 1)
with ProcessPoolExcutor(max_workers=N_CHUNKS) as executor: # assuming you have 6 processors
the_futures = {}
start = 0
for i in range(N_CHUNKS - 1):
future = executor.submit(fnc, data, mesh[start:start+chunk_size]) # pass slice
the_futures[future] = (start, start+chunk_size) # map future to request parameters
start += chunk_size
if last_chunk_size:
future = executor.submit(fnc, data, mesh[start:n]) # pass slice
the_futures[future] = (start, start+n)
for future in as_completed(the_futures):
(start, end) = the_futures[future] # the original range
d = future.result() # do something with the results
if __name__ == '__main__':
# the call to main must be done in a block governed by if __name__ == '__main__' or you will get into a recursive
# loop where each subprocess calls main again
main(data, mesh)