我在python中有一个列表lst
。我想在这个列表的每个项目上调用一个函数f
。该函数f
调用第三方函数g
。我还想通过列表lst
中的每个项目来测量每次函数调用g
所花费的时间。我想加快进程,所以我使用多处理池来并行执行。目前,我有以下代码,但它不起作用。我从这篇文章中了解到,map_async
只能调用一元函数。我还想利用在map_async
中创建多个进程的优势,所以我不想切换到apply_async
。有人能建议我,在这里有什么更好的选择来实现我的目标吗?
我目前的解决方案不起作用:
import multiprocessing as mp
time_metrics = {}
def f(idx):
global time_metrics
a = time.now()
g(idx)
b = time.now()
time_metrics[idx] = b-a
lst = [1, 2, 3, 4, 5, 6]
pool = mp.Pool(7)
pool.map_async(f, lst)
pool.close()
pool.join()
print(time_metrics)
多处理器不共享内存空间,它使用进程"forks"将当前进程状态(或仅部分状态,取决于使用的fork/spawn类型和操作系统(克隆到RAM中的新位置,并分配给新的进程ID,然后独立运行。如果你想使用共享内存区域,任务会变得更复杂,我发现在我的一些旧项目中,共享内存比使用队列将数据传递回父进程并存储到dict中要慢。
对于这个任务,尽管在我看来你不需要做任何事情,但你可以只返回时间值,然后在池完成执行后(在同步模式下,而不是异步模式下,这样进程池就会阻塞,直到所有进程都完成任务(,你可以迭代并收集结果。
因此,这里可能是最简单的解决方案:
from datetime import datetime
import multiprocessing as mp
time_metrics = {}
def g(a):
# placeholder function for whatever you have as g()
for i in range(5000*a):
pass
def f(idx):
# once spawned, a process calling this function cannot edit objects in the memory of the parent process,
# unless using the special shared memory objects in the mp class.
a = datetime.utcnow()
g(idx)
b = datetime.utcnow()
return (idx, b - a)
if __name__ == "__main__":
lst = [1, 2, 3, 4, 5, 6]
# don't assign 1 process for each job, use only number of cores your machine has, as rarely any benefit of using more, especially for benchmarking.
with mp.Pool() as pool:
# blocks until result is available
results = pool.map(f, lst)
for row in results:
time_metrics[row[0]] = row[1]
print(time_metrics)
如果您感兴趣,可以对其进行重构,使用多处理库中的共享内存字典或mp.Queue的实例将结果传递回父进程进行收集,但据我所见,这并不是解决这个问题的必要条件。
是否有任何理由确实需要使用池的异步版本,或者这种方法是否足够?
如果你真的想使用map_async,这个片段可以使用:
from datetime import datetime
import multiprocessing as mp
time_metrics = {}
def g(a):
for i in range(5000*a):
pass
def f(idx):
a = datetime.utcnow()
g(idx)
b = datetime.utcnow()
return (idx, b - a)
def append_res(result: tuple):
for row in result:
time_metrics[row[0]] = row[1]
if __name__ == "__main__":
lst = [1, 2, 3, 4, 5, 6]
# don't assign 1 process for each job, use only number of cores your machine has, as rarely any benefit of using more, especially for benchmarking.
with mp.Pool() as pool:
# doesn't block until result is available.
# callback is applied to list of results when all the tasks are complete
results = pool.map_async(f, lst, callback = append_res)
# wait for result to become available, otherwise parent process will exit the context manager and processes will not complete
results.wait()
print(time_metrics)
与.map((相比,我不能100%确定.map_sync((的行为,.map(。只要机器上的每个CPU核心不需要处理比现有核心多得多的python进程,这就可以根据需要进行基准测试,因为这只会增加开销和负载,并会给您带来不准确的基准测试。对于map_async,通常对于异步函数,单个结果可用的顺序可能不是它们被分配的顺序,这对我来说意味着所有任务都同时被分配到进程池,这可能会在任务之间产生CPU资源的竞争,并可能产生不准确的基准,尽管我可以请人在评论中对此进行确认。