from multiprocessing import Pool
with Pool(processes=5) as p:
p.starmap(name_of_function, all_inputs)
我有一段像上面这样的代码,可以并行执行一个函数。假设all_inputs
有 10,000 个元素,我想知道当前正在执行哪个元素,例如 10,000 个中的 100 个......有没有办法获得该索引?
multiprocessing.Pool
中的工作进程是Process
的一个实例,它保留了一个内部计数器来标识自己,您可以将此计数器与操作系统进程 ID 一起使用:
import os
from multiprocessing import current_process, Pool
def x(a):
p = current_process()
print('process counter:', p._identity[0], 'pid:', os.getpid())
if __name__ == '__main__':
with Pool(2) as p:
r = p.map(x, range(4))
p.join()
收益 率:
process counter: 1 pid: 29443
process counter: 2 pid: 29444
process counter: 2 pid: 29444
process counter: 1 pid: 29443
IIUC,您也可以传入索引。(从@user1767754窃取设置)(如果这不是您要找的,请告诉我。
from multiprocessing import Pool
arr = [1,2,3,4,5]
arr_with_idx = zip(arr, range(len(arr)))
def x(a, idx):
print(idx)
return a*a
with Pool(5) as p:
p.starmap(x, arr_with_idx)
或者更简洁地说,使用enumerate
from multiprocessing import Pool
arr = [1,2,3,4,5]
def x(idx, a): # different here
print(idx)
return a*a
with Pool(5) as p:
p.starmap(x, enumerate(arr))
starmap
将解压缩每个元组,您可以打印出索引部分。
您可以使用多处理中的current_process
方法。如果这还不够准确,您甚至可以使用uuid
name
传递流程
from multiprocessing import current_process
def x(a):
print(current_process(), a)
return a*a
with Pool(5) as p:
p.map(x, [1,2,3,4,5]
我建议将索引与其他参数一起传递。您可以使用enumerate
与生成器表达式结合使用,将值添加到现有参数中。下面是假设all_inputs
是元组的可迭代对象的代码:
with Pool(processes=5) as p:
p.starmap(name_of_function, ((i,) + args for i, args in enumerate(all_inputs)))
您可以从此一般主题的一系列变体中进行选择。例如,您可以将索引放在参数的末尾,而不是开头(只需将(i,) + args
交换为args + (i,)
)。
发现哪个进程正在处理值 100 的最简单方法是在池启动之前将数组分成相等的部分。这使您可以控制池中的哪些进程处理数组的哪个部分(因为您正在为每个进程传递预定义的开始,结束索引)。
例如,如果all_inputs
有 50 个元素,值范围从 80 到 130,则 8 核 CPU 的简单作业划分将返回以下索引对:
Process #0 will work on indexes [0:5] with values: 80 - 85
Process #1 will work on indexes [6:11] with values: 86 - 91
Process #2 will work on indexes [12:17] with values: 92 - 97
Process #3 will work on indexes [18:23] with values: 98 - 103
Process #4 will work on indexes [24:29] with values: 104 - 109
Process #5 will work on indexes [30:35] with values: 110 - 115
Process #6 will work on indexes [36:41] with values: 116 - 121
Process #7 will work on indexes [42:50] with values: 122 - 130
当池启动时,您已经知道哪一个将负责处理值 100。
这种方法的成功依赖于jobDiv()
来发挥它的魔力,并根据输入数组的大小和可用 CPU 内核的数量划分进程的数据。
源代码:
import multiprocessing as mp
# processArray(): a parallel function that process an array based on start and
# end index positions.
def processArray(procId, array, indexes):
startIdx, endIdx = indexes
print(" Process #" + str(procId) + " startIdx=" + str(startIdx), " endIdx=" + str(endIdx))
# Do some work:
for i in range(startIdx, endIdx+1):
print(" Process #" + str(procId) + " is computing index " + str(i), " with value " + str(array[i]))
# jobDiv(): performs a simple job division between available CPU cores
def jobDiv(inputArray, numCPUs):
jobs = []
arrayLength = len(inputArray)
jobRange = int(arrayLength / numCPUs)
extra = arrayLength - (jobRange * numCPUs)
prevEnd = 0
for c in range(numCPUs):
endIdx = (c * jobRange) + jobRange - 1
if (c == (numCPUs-1)):
endIdx += extra
startIdx = prevEnd
if ( (c > 0) and (startIdx+1 < arrayLength) ):
startIdx += 1
jobs.append( (startIdx, endIdx) )
prevEnd = endIdx
return jobs
if __name__ == '__main__':
# Initialize dataset for multiprocessing with 50 numbers, with values from 80 to 131
nums = range(80, 131)
# How many CPU cores can be used for this dataset
numCPUs = mp.cpu_count()
if (numCPUs > len(nums)):
numCPUs = len(nums)
# This function returns a list of tuples containing array indexes for
# each process to work on. When nums has 100 elements and numCPUs is 8,
# it returns the following list:
# (0, 11), (12, 23), (24, 35), (36, 47), (48, 59), (60, 71), (72, 83), (84, 99)
indexes = jobDiv(nums, numCPUs)
# Prepare parameters for every process in the pool, where each process gets one tuple of:
# (cpu_id, array, array_indexes)
jobArgs = []
for id, arg in enumerate(indexes):
start, end = arg
print("Process #" + str(id) + " will work on indexes [" + str(start) + ":" + str(end) +
"] with values: " + str(nums[start]) + " - " + str(nums[end]))
jobArgs.append( (id, nums, arg) )
print("* Starting Pool")
# For every process, send the data for processing along with it's respective tuple of parameters
with mp.Pool(processes=numCPUs) as p:
sums = p.starmap(processArray, jobArgs)
print("* Finished")
输出:
* Starting Pool
Process #0 startIdx=0 endIdx=5
Process #0 is computing index 0 with value 80
Process #0 is computing index 1 with value 81
Process #0 is computing index 2 with value 82
Process #0 is computing index 3 with value 83
Process #0 is computing index 4 with value 84
Process #0 is computing index 5 with value 85
Process #1 startIdx=6 endIdx=11
Process #1 is computing index 6 with value 86
Process #1 is computing index 7 with value 87
Process #1 is computing index 8 with value 88
Process #1 is computing index 9 with value 89
Process #1 is computing index 10 with value 90
Process #1 is computing index 11 with value 91
Process #2 startIdx=12 endIdx=17
Process #2 is computing index 12 with value 92
Process #2 is computing index 13 with value 93
Process #2 is computing index 14 with value 94
Process #2 is computing index 15 with value 95
Process #2 is computing index 16 with value 96
Process #2 is computing index 17 with value 97
Process #3 startIdx=18 endIdx=23
Process #3 is computing index 18 with value 98
Process #3 is computing index 19 with value 99
Process #3 is computing index 20 with value 100
Process #3 is computing index 21 with value 101
Process #3 is computing index 22 with value 102
Process #4 startIdx=24 endIdx=29
Process #3 is computing index 23 with value 103
Process #4 is computing index 24 with value 104
Process #4 is computing index 25 with value 105
Process #4 is computing index 26 with value 106
Process #4 is computing index 27 with value 107
Process #4 is computing index 28 with value 108
Process #4 is computing index 29 with value 109
Process #5 startIdx=30 endIdx=35
Process #5 is computing index 30 with value 110
Process #5 is computing index 31 with value 111
Process #5 is computing index 32 with value 112
Process #5 is computing index 33 with value 113
Process #5 is computing index 34 with value 114
Process #5 is computing index 35 with value 115
Process #6 startIdx=36 endIdx=41
Process #6 is computing index 36 with value 116
Process #6 is computing index 37 with value 117
Process #6 is computing index 38 with value 118
Process #7 startIdx=42 endIdx=50
Process #6 is computing index 39 with value 119
Process #6 is computing index 40 with value 120
Process #7 is computing index 42 with value 122
Process #6 is computing index 41 with value 121
Process #7 is computing index 43 with value 123
Process #7 is computing index 44 with value 124
Process #7 is computing index 45 with value 125
Process #7 is computing index 46 with value 126
Process #7 is computing index 47 with value 127
Process #7 is computing index 48 with value 128
Process #7 is computing index 49 with value 129
Process #7 is computing index 50 with value 130
* Finished
值得指出的是显而易见的,并说每个进程都知道当前正在处理什么值,但是,main()
不知道。
main()
只知道每个进程将处理的索引范围,但它不知道(实时)这些进程当前正在处理哪些值。
如果需要main()
在进程运行时访问此信息,最好在main()
中设置一个Queue
,并在池启动之前在单独的Thread
上运行它。然后,确保将Queue
对象作为传递给每个进程的参数的一部分发送,以便它们都可以共享同一对象并存储当前正在处理的数据。
如果您在给定时间点在现代多核 CPU 上运行代码,则工作进程将并行执行多个任务。您可以使用队列来实现自定义协议,您的工作人员将告知主进程,他们开始和完成哪个任务(任务索引)。
import os
import time
import queue
import random
import multiprocessing
def fn(st_queue, i):
st_queue.put((multiprocessing.current_process().name, i))
time.sleep(random.random()) # your long calculation
st_queue.put((multiprocessing.current_process().name, None))
return i ** 2
def main():
status = {}
st_queue = multiprocessing.Manager().Queue()
result = []
pool = multiprocessing.Pool(4)
args = zip([st_queue] * 20, range(20))
async_res = pool.starmap_async(
fn, args, callback = lambda r: result.append(r))
while not async_res.ready():
try:
msg = st_queue.get(True, 0.1)
except queue.Empty:
pass
else:
status.update([msg])
print(status)
print(result.pop())
pool.close()
if __name__ == '__main__':
main()
状态字典如下所示:
{
'ForkPoolWorker-4': None,
'ForkPoolWorker-5': 16,
'ForkPoolWorker-2': 18,
'ForkPoolWorker-3': 15
}
如果您乐于报告来自工作进程的索引,并且不介意不按顺序报告它们,那么您可以使用其他人建议的枚举方法。如果要从主流程跟踪工作(例如,管理状态栏)和/或需要知道已启动的项目总数,则需要让每个工作线程在启动时向父级报告。这可以通过管道完成,如下所示。
我不确定您是要报告项目的索引还是已启动的项目总数(它们可能不按顺序开始),所以我同时报告两者。
# dummy data and function
all_inputs = list(zip(range(10), range(20,30)))
def name_of_function(a, b):
return a+b
# main code
from multiprocessing import Pool, Pipe, Lock
parent_conn, child_conn = Pipe()
lock = Lock()
def wrapper(idx, args):
with lock:
child_conn.send(idx)
return name_of_function(*args)
with Pool(processes=5) as p:
p.starmap_async(wrapper, enumerate(all_inputs))
# receive status updates
num_items = len(all_inputs)
for i in range(num_items):
idx = parent_conn.recv()
print("processing index {} ({}/{})".format(idx, i+1, num_items))
child_conn.close()
parent_conn.close()
# output (note that items may be started out of sequence):
# processing index 0 (1/10)
# processing index 1 (2/10)
# processing index 2 (3/10)
# processing index 3 (4/10)
# processing index 5 (5/10)
# processing index 6 (6/10)
# processing index 4 (7/10)
# processing index 7 (8/10)
# processing index 8 (9/10)
# processing index 9 (10/10)
请注意,这使用starmap_async
而不是starmap
,以便在子进程运行时继续执行主线程。或者,您可以使用starmap
并启动一个单独的线程来报告进度,如下所示:
from threading import Thread
from multiprocessing import Pool, Pipe, Lock
parent_conn, child_conn = Pipe()
lock = Lock()
def receive_updates(num_items):
for i in range(num_items):
idx = parent_conn.recv()
print("processing index {} ({}/{})".format(idx, i+1, num_items))
def wrapper(idx, args):
with lock:
child_conn.send(idx)
return name_of_function(*args)
# launch another thread to receive the results, since the main thread
# will wait for starmap
result_thread = Thread(target=receive_updates, args=(len(all_inputs),))
result_thread.Daemon = True # stop if main thread is killed
result_thread.start()
# your original code
with Pool(processes=5) as p:
p.starmap(wrapper, all_inputs)
child_conn.close()
parent_conn.close()
如果您满足于知道每个项目何时完成(而不是开始),您可以将apply_async
与回调一起使用,如下所示:
# dummy data and function
all_inputs = list(zip(range(10), range(20,30)))
def name_of_function(a, b):
return a+b
# main code
from multiprocessing import Pool
num_items = len(all_inputs)
num_done = 0
def handle_result(res):
global num_done
num_done += 1
print('finished item {} of {}.'.format(num_done, num_items))
p = Pool(5)
for args in all_inputs:
p.apply_async(name_of_function, args, callback=handle_result)
p.close()
p.join() # wait for tasks to finish
结果:
finished item 1 of 10.
finished item 2 of 10.
finished item 3 of 10.
finished item 4 of 10.
finished item 5 of 10.
finished item 6 of 10.
finished item 7 of 10.
finished item 8 of 10.
finished item 9 of 10.
finished item 10 of 10.
请注意,结果不一定与all_inputs
的顺序相同。如果您需要确切地知道处理了哪个项目,则可以像这样枚举参数:
from multiprocessing import Pool
num_items = len(all_inputs)
num_done = 0
def handle_result(idx):
global num_done
num_done += 1
print('finished index {} ({}/{}).'.format(idx, num_done, num_items))
def wrapper(idx, args):
name_of_function(*args)
return idx
p = Pool(5)
for args in enumerate(all_inputs):
p.apply_async(wrapper, args, callback=handle_result)
p.close()
p.join() # wait for tasks to finish
结果:
finished index 0 (1/10).
finished index 1 (2/10).
finished index 2 (3/10).
finished index 3 (4/10).
finished index 4 (5/10).
finished index 6 (6/10).
finished index 8 (7/10).
finished index 7 (8/10).
finished index 9 (9/10).
finished index 5 (10/10).