如何从并行进程中运行的函数中检索值



对于python初学者来说,多处理器模块非常令人困惑,尤其是对于那些刚刚从MATLAB迁移过来并对其并行计算工具箱感到懒惰的人来说。我有以下函数,运行大约需要80秒,我想通过使用Python的多处理器模块来缩短这段时间。

from time import time
xmax   = 100000000
start = time()
for x in range(xmax):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
end  = time()
tt   = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time:          ', tt)

此输出符合预期:

Condition met at:  -15 0
Condition met at:  -3 1
Condition met at:  11 2
Each iteration took:  8.667453265190124e-07
Total time:           86.67453265190125

由于循环的任何迭代都不依赖于其他迭代,我试图采用官方文档中的这个服务器进程来扫描单独进程中的范围块。最后,我得出了vartec对这个问题的答案,并可以准备以下代码。我还根据Darkonaut对当前问题的回答更新了代码。

from time import time 
import multiprocessing as mp
def chunker (rng, t): # this functions makes t chunks out of rng
L  = rng[1] - rng[0]
Lr = L % t
Lm = L // t
h  = rng[0]-1
chunks = []
for i in range(0, t):
c  = [h+1, h + Lm]
h += Lm
chunks.append(c)
chunks[t-1][1] += Lr + 1
return chunks
def worker(lock, xrange, return_dict):
'''worker function'''
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
return_dict['x'].append(x)
return_dict['y'].append(y)
with lock:                
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y
if __name__ == '__main__':
start = time()
manager = mp.Manager()
return_dict = manager.dict()
lock = manager.Lock()
return_dict['x']=manager.list()
return_dict['y']=manager.list()
xmax = 100000000
nw = mp.cpu_count()
workers = list(range(0, nw))
chunks = chunker([0, xmax], nw)
jobs = []
for i in workers:
p = mp.Process(target=worker, args=(lock, chunks[i],return_dict))
jobs.append(p)
p.start()
for proc in jobs:
proc.join()
end = time()
tt   = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time:          ', tt)
print(return_dict['x'])
print(return_dict['y'])

这将运行时间大大缩短至~17秒。但是,我的共享变量无法检索任何值。请帮我找出代码的哪一部分出了问题。

我得到的输出是:

Each iteration took:  1.7742713451385497e-07
Total time:           17.742713451385498
[]
[]

我期待:

Each iteration took:  1.7742713451385497e-07
Total time:           17.742713451385498
[0, 1, 2]
[-15, -3, 11]

示例中的问题是不会传播对Manager.dict中标准可变结构的修改。我首先向你们展示如何和经理一起解决这个问题,只是为了在之后向你们展示更好的选择。

multiprocessing.Manager有点重,因为它只为Manager使用了一个单独的进程,并且处理共享对象需要使用锁来实现数据一致性。如果你在一台机器上运行,multiprocessing.Pool有更好的选择,以防你不必运行定制的Process类,如果必须的话,multiprocessing.Processmultiprocessing.Queue将是常见的方法。

引用部分来自多处理文档。


经理

如果引用中包含标准(非代理)列表或dict对象,则对这些可变值的修改将不会通过管理器传播,因为代理无法知道其中包含的值何时被修改。然而,将值存储在容器代理中(这会触发代理对象上的setitem)确实会通过管理器传播,因此为了有效地修改这样的项,可以将修改后的值重新分配给容器代理。。。

在您的情况下,这看起来像:

def worker(xrange, return_dict, lock):
"""worker function"""
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y

这里的lock将是一个必须作为参数传递的manager.Lock实例,因为整个(现在)锁定的操作本身并不是原子操作。(此处是使用锁定的Manager的一个更容易的例子)

这种方法可能比在大多数用例中使用嵌套的代理对象更不方便,但也展示了对同步的控制级别。

由于Python 3.6代理对象是可嵌套的:

3.6版本中的更改:共享对象能够嵌套。例如,共享容器对象(如共享列表)可以包含其他共享对象,这些对象都将由SyncManager管理和同步。

由于Python 3.6,您可以在开始多处理之前使用manager.list作为值填充manager.dict,然后直接追加到worker中,而无需重新分配。

return_dict['x'] = manager.list()
return_dict['y'] = manager.list()

编辑:

以下是Manager的完整示例:

import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges
# def context_timer ... see code snippet in "Pool" section below
def worker(batch_range, return_dict, lock):
"""worker function"""
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
return_dict['x'].append(x)
return_dict['y'].append(y)

if __name__ == '__main__':
N_WORKERS = mp.cpu_count()
X_MAX = 100000000
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)
with Manager() as manager:
lock = manager.Lock()
return_dict = manager.dict()
return_dict['x'] = manager.list()
return_dict['y'] = manager.list()
tasks = [(batch_range, return_dict, lock)
for batch_range in batch_ranges]
with context_timer():
pool = [Process(target=worker, args=args)
for args in tasks]
for p in pool:
p.start()
for p in pool:
p.join()
# Create standard container with data from manager before exiting
# the manager.
result = {k: list(v) for k, v in return_dict.items()}
print(result)

大多数情况下,multiprocessing.Pool只会做到这一点。由于您希望在一个范围内分布迭代,因此在您的示例中还有一个额外的挑战。您的chunker功能无法划分范围,即使每个流程都有相同的工作要做:

chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]]  # 4, 4, 4, 6!

对于下面的代码,请从我的回答中获取mp_utils.py的代码片段,它提供了两个尽可能均匀的块范围函数。

使用multiprocessing.Poolworker函数只需返回结果,Pool将负责通过内部队列将结果传输回父进程。result将是一个列表,因此您必须以您希望的方式重新调整结果。您的示例可能如下所示:

import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chain
from mp_utils import calc_batch_sizes, build_batch_ranges
@contextmanager
def context_timer():
start_time = time.perf_counter()
yield
end_time = time.perf_counter()
total_time   = end_time-start_time
print(f'nEach iteration took: {total_time / X_MAX:.4f} s')
print(f'Total time:          {total_time:.4f} sn')

def worker(batch_range):
"""worker function"""
result = []
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
return result

if __name__ == '__main__':
N_WORKERS = mp.cpu_count()
X_MAX = 100000000
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)
with context_timer():
with Pool(N_WORKERS) as pool:
results = pool.map(worker, iterable=batch_ranges)
print(f'results: {results}')
x, y = zip(*chain.from_iterable(results))  # filter and sort results
print(f'results sorted: x: {x}, y: {y}')

输出示例:

[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000), 
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at:  -15 0
Condition met at:  -3 1
Condition met at:  11 2
Each iteration took: 0.0000 s
Total time:          8.2408 s
results: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)
Process finished with exit code 0

如果您的worker有多个参数,那么您将构建一个包含参数元组的"任务"列表,并用pool.starmap(...iterable=tasks)交换pool.map(...)。有关详细信息,请参阅文档。


流程;队列

如果由于某种原因无法使用multiprocessing.Pool,则必须采取通过传递multiprocessing.Queue作为子级中辅助函数的参数-处理,并让它们将结果排队发送回父母亲

您还必须构建类似Pool的结构,这样您就可以对其进行迭代以启动和加入流程,并且您必须get()从队列返回结果。关于Queue.get用法的更多信息,我已经写在这里了。

采用这种方法的解决方案可能如下所示:

def worker(result_queue, batch_range):
"""worker function"""
result = []
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
result_queue.put(result)  # <--

if __name__ == '__main__':
N_WORKERS = mp.cpu_count()
X_MAX = 100000000
result_queue = mp.Queue()  # <--
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)
with context_timer():
pool = [Process(target=worker, args=(result_queue, batch_range))
for batch_range in batch_ranges]
for p in pool:
p.start()
results = [result_queue.get() for _ in batch_ranges]
for p in pool:
p.join()
print(f'results: {results}')
x, y = zip(*chain.from_iterable(results))  # filter and sort results
print(f'results sorted: x: {x}, y: {y}')

相关内容

  • 没有找到相关文章

最新更新