使用 python 的多处理模块在池工作线程中使用本地内存



我正在努力在python中实现随机算法。由于这涉及多次(例如N次(做同样的事情,因此它很自然地并行化,我想利用这一点。更具体地说,我想在 CPU 的所有内核上分配 N 次迭代。所讨论的问题涉及计算某物的最大值,因此每个工作线程都可以计算自己的最大值,然后只将那个最大值报告回父进程,然后父进程只需要从这几个局部最大值中找出全局最大值。

有点令人惊讶的是,这似乎不是多处理模块的预期用例,但我不完全确定如何去做。经过一些研究,我想出了以下解决方案(玩具问题,以在结构上与我的实际列表相同的列表中找到最大值(:

import random
import multiprocessing
l = []
N = 100
numCores = multiprocessing.cpu_count()
# globals for every worker
mySendPipe = None
myRecPipe = None
def doWork():
pipes = zip(*[multiprocessing.Pipe() for i in range(numCores)])
pool = multiprocessing.Pool(numCores, initializeWorker, (pipes,))
pool.map(findMax, range(N))
results = []
# collate results
for p in pipes[0]:
if p.poll():
results.append(p.recv())
print(results)
return max(results)
def initializeWorker(pipes):
global mySendPipe, myRecPipe
# ID of a worker process; they are consistently named PoolWorker-i
myID = int(multiprocessing.current_process().name.split("-")[1])-1
# Modulo: When starting a second pool for the second iteration of doWork() they are named with IDs 5-8.
mySendPipe = pipes[1][myID%numCores]
myRecPipe = pipes[0][myID%numCores]
def findMax(count):
myMax = 0
if myRecPipe.poll():
myMax = myRecPipe.recv()
value = random.choice(l)
if myMax < value:
myMax = value
mySendPipe.send(myMax)
l = range(1, 1001)
random.shuffle(l)
max1 = doWork()
l = range(1001, 2001)
random.shuffle(l)
max2 = doWork()
return (max1, max2)

这有点有效,但我有一个问题。也就是说,使用管道来存储中间结果感觉相当愚蠢(并且可能很慢(。但它也有一个真正的问题,即我无法通过管道发送任意大的东西,不幸的是,我的应用程序有时会超过这个大小(和死锁(。

因此,我真正想要的是一个类似于初始值设定项的函数,我可以为池中的每个工作线程调用一次,以将其本地结果返回到父进程。我找不到这样的功能,但也许这里有人有一个想法?

最后几点说明:

  • 使用全局变量作为输入,因为在我的应用程序中,输入非常大,我不想将其复制到每个进程。由于进程从不写入它,我相信它永远不应该被复制(或者我错了吗?我愿意接受以不同方式执行此操作的建议,但请注意,我需要在更改输入时运行它(尽管按顺序运行,就像上面的例子一样(。
  • 我想避免使用 Manager-class,因为(根据我的理解(它引入了同步和锁定,这在这个问题中应该是完全不必要的。

我能找到的唯一另一个类似的问题是 Python 的多处理和内存,但他们希望实际处理工作线程的单个结果,而我不希望工作线程返回 N 个东西,而是只运行总共 N 次并只返回他们的局部最佳结果。

我正在使用Python 2.7.15。


tl;dr:有没有办法为多处理池中的每个工作进程使用本地内存,以便每个工作线程都可以计算局部最优值,而父进程只需要担心找出其中哪一个是最好的?

你可能想多了。 通过让你的worker-functions(在本例中findMax(实际上返回一个值而不是传达它,你可以存储调用pool.map()的结果 - 毕竟它只是map的并行变体!它将在输入列表上映射一个函数,并返回该函数调用的结果列表。

说明我的观点的最简单的例子是你的"分布式最大值"示例:

import multiprocessing
# [0,1,2,3,4,5,6,7,8]
x = range(9)
# split the list into 3 chunks
# [(0, 1, 2), (3, 4, 5), (6, 7, 8)]
input = zip(*[iter(x)]*3)
pool = multiprocessing.Pool(2)
# compute the max of each chunk:
# max((0,1,2)) == 2
# max((3,4,5)) == 5
# ...
res = pool.map(max, input)
print(res)

这将返回[2, 5, 8]。 请注意,有一些光魔法正在发生:我使用内置的max()函数,该函数需要可迭代对象作为输入。现在,如果我只pool.map一个普通的整数列表,比如range(9),这将导致调用max(0)max(1)等 - 不是很有用,是吧?相反,我将列表划分为块,因此在映射时,我们现在有效地映射元组列表,从而在每次调用时馈送一个元max

所以也许你必须:

  • 从工作线程函数返回值
  • 考虑如何构建输入域,以便为每个工作线程提供有意义的块

PS:你写了一个很棒的第一个问题!谢谢,很高兴阅读它:)欢迎来到StackOverflow!

相关内容

  • 没有找到相关文章