python中的多处理任务



我想弄清楚如何用一种不寻常的公式来执行多处理任务。

基本上,给定两个列表,每个列表包含10个矩阵,我必须检查如果输入是(A, B)或反之(B, A),应用操作(我称之为fn)是否会得到相同的结果。

使用顺序方法,解决方案很简单:

#Given
A = [matrix_a1, ... , matrix_a10]
B =  [matrix_b1, ... , matrix_b10]
AB_BA= [fn(A[i], B[i])==fn(B[i], A[i]) for i in range(0, len(A)) ]

下一个任务有点奇怪,因为它需要严格设置超过10个线程并应用multiprocessing。限制是,您不能将所有单个比较分配给十个不同的进程,因为剩余的进程将未被使用。我不知道为什么请求似乎在使用"过程"。和";thread"互换。

这个任务看起来有点令人困惑,因为在多进程中,通常设置的是工作线程的最大数量,而不是最小数量。

我尝试使用使用ProcessPoolExecutor的解决方案,如下所示:

def equality(A, B,i):
res= fn(A[i], B[i]) == fn(B[i],A[i] )
return(res)
with concurrent.futures.ProcessPoolExecutor(max_workers=20) as executor:
idx=range(0, len(A))
results= executor.map(equality, A, B, idx)
for result in results:
print(result)

我的问题是我不确定如何检查资源使用情况。我曾天真地尝试使用ubuntu系统监视器和"top"来监控CPU使用情况。

此外,这个解决方案是我尝试过的最有效的,但是没有直接规范使用至少11个工人,所以这个解决方案似乎不符合要求。

我也尝试了其他的解决方案,比如直接使用pool。这将导致使用top调用10个python实例,但同样不超过10个。下面是我的尝试:

def equality(A, B):
res=fn(A, B) == fn(B,A )
return(res)
with mp.Pool(20) as p:
print(p.starmap(equality, ((A[i], B[i]) for i in range(0, len(A)))))

你有什么建议来解决这个请求,以及监控资源使用情况,以确保它是预期的工作?

事先非常感谢你的帮助。

我希望您一字不差地发表了实际的问题,因为您的描述有点不清楚。但这是我所知道的(或认为我知道):

除非您的工作函数equality完成的CPU处理量足够大,以便通过并行运行该函数获得的收益超过了不使用多处理(即启动进程,将数据从一个地址空间移动到另一个地址空间等)而不会产生的额外多处理开销,否则您的多处理代码将运行得更慢。因此,您应该设计worker函数来完成尽可能多的工作并传递尽可能少的数据。

当你指定…

results = executor.map(equality, A, B, idx)

…您的equality函数将为A,Bidx的每个元素调用一次。因此,传递的不是整个列表AB,而是单个元素(例如matrix_a1matrix_b1)。因此,传递一个idx参数是没有意义的:

def equality(matrix_a, matrix_b):
"""
matrix_a and matrix_a are each single elements of
lists A and B respecticely.
"""
return fn(matrix_a) == fn(matrix_b)

def main():
from os import cpu_count
from concurrent.futures import ProcessPoolExecutor
A = [matrix_a1, ... , matrix_a10]
B =  [matrix_b1, ... , matrix_b10]
# Do not create more processes then we have either
# CPU cores or the number of tasks that need to submit:
pool_size = min(cpu_count(), len(A))
with ProcessPoolExecutor(max_workers=pool_size) as executor:
AB_BA = list(executor.map(equality, A, B))
# This will be a list of 10 elements, each either `True` or `False`:
print(AB_BA)
# Required for Windows:
if __name__ == '__main__':
main()

因此,我们将向池大小为10的池提交10个任务。内部有一个"任务队列"所有传递给equality的参数都存在:

matrix_a1, matrix_b1 # task 1
matrix_a2, matrix_b2 # task 2
...
matrix_a10, matrix_b10 # task 10

池中空闲的任何进程都将捕获队列中要处理的下一个任务,并且结果将按任务提交顺序返回。但是,由于equality是一个运行时间很短的函数,除非函数fn足够复杂,所以有可能抢占第一个任务的池进程可以完成它,然后在操作系统调度其他池进程之前抢占第二个任务。因此,即使函数fn是足够的cpu密集型,也不能保证所有10个任务都将由10个池进程并行处理。如果您要在equality的开头插入对time.sleep(.1)的调用,这将给其他池进程一个"唤醒"的机会。并从任务队列中获取自己的任务。但是这会使你的程序更慢,因为为了这个目的而睡觉是完全没有效率的。但是我想说明的是,您不能确保所有池进程总是并发地处于活动状态。

最新更新