由 Python 多处理中的胖参数引起的延迟



我在python中使用多处理来并行一些计算繁重的函数。 但我发现如果传递一个 FAT 参数(例如,一个 1000 个音符的 NetworkX 图或一个 1000000 个项目的列表),则进程创建会延迟。 我在两个多处理模块"多处理"和"Pathos"上进行实验,得到了类似的结果。 我的问题是如何避免这种延迟,因为它破坏了并行计算带来的好处。

在我的示例代码中,我只是将一个 FAT 参数传递给函数以进行多处理 - 函数体不会将参数作为所有参数。

  1. 使用"多处理"的示例代码
import multiprocessing
import time
def f(args):
(x, conn, t0, graph) = args
ans = 1
x0 = x
t = time.time() - t0
conn.send('factorial of %d: start@%.2fs' % (x0, t))
while x > 1:
ans *= x
time.sleep(0.5)
x -= 1
t = time.time() - t0
conn.send('factorial of %d: finish@%.2fs, res = %d' %(x0, t, ans))
return ans
def main():
var = (4, 8, 12, 20, 16)
p = multiprocessing.Pool(processes = 4)
p_conn, c_conn = multiprocessing.Pipe()
params = []
t0 = time.time()
N = 1000
import networkx as nx
G = nx.complete_graph(N, nx.DiGraph())
import random
for (start, end) in G.edges:
G.edges[start, end]['weight'] = random.random()
for i in var:
params.append((i, c_conn, t0, G))
res = list(p.imap(f, params))
p.close()
p.join()
print('output:')
while p_conn.poll():
print(p_conn.recv())
t = time.time() - t0
print('factorial of %s@%.2fs: %s' % (var, t, res))
if __name__ == '__main__':
main()

上述示例代码的输出

output:
factorial of 4: start@29.78s
factorial of 4: finish@31.29s, res = 24
factorial of 8: start@53.56s
factorial of 8: finish@57.07s, res = 40320
factorial of 12: start@77.25s
factorial of 12: finish@82.75s, res = 479001600
factorial of 20: start@100.39s
factorial of 20: finish@109.91s, res = 2432902008176640000
factorial of 16: start@123.55s
factorial of 16: finish@131.05s, res = 20922789888000
factorial of (4, 8, 12, 20, 16)@131.06s: [24, 40320, 479001600, 2432902008176640000, 20922789888000]
Process finished with exit code 0

根据上述输出,两个进程创建之间大约有 24 秒的延迟

  1. 使用"pathos"的示例代码
import pathos
import multiprocess
import time
def f(x, conn, t0, graph):
ans = 1
x0 = x
t = time.time() - t0
conn.send('factorial of %d: start@%.2fs' % (x0, t))
while x > 1:
ans *= x
time.sleep(0.5)
x -= 1
t = time.time() - t0
conn.send('factorial of %d: finish@%.2fs, res = %d' %(x0, t, ans))
return ans
def main():
var = (4, 8, 12, 20, 16)
p = pathos.multiprocessing.ProcessPool(nodes=4)
p_conn, c_conn = multiprocess.Pipe()
t0 = time.time()
conn_s = [c_conn] * len(var)
t0_s = [t0] * len(var)
N = 1000
import networkx as nx
G = nx.complete_graph(N, nx.DiGraph())
import random
for (start, end) in G.edges:
G.edges[start, end]['weight'] = random.random()
res = list(p.imap(f, var, conn_s, t0_s, [G] * len(var)))
print('output:')
while p_conn.poll():
print(p_conn.recv())
t = time.time() - t0
print('factorial of %s@%.2fs: %s' % (var, t, res))
if __name__ == '__main__':
main()

上述示例代码的输出,

output:
factorial of 4: start@29.63s
factorial of 4: finish@31.13s, res = 24
factorial of 8: start@53.50s
factorial of 8: finish@57.00s, res = 40320
factorial of 12: start@76.94s
factorial of 12: finish@82.44s, res = 479001600
factorial of 20: start@100.72s
factorial of 20: finish@110.23s, res = 2432902008176640000
factorial of 16: start@123.69s
factorial of 16: finish@131.20s, res = 20922789888000
factorial of (4, 8, 12, 20, 16)@131.20s: [24, 40320, 479001600, 2432902008176640000, 20922789888000]
Process finished with exit code 0

同样,根据上述输出,两个进程创建之间大约有 24 秒的延迟。

如果我减小图形大小(较小的节点数),延迟会相应减少。 我想这是由于用于将 NetworkX 图作为参数进行酸洗/肮脏的额外时间。 理想情况下,应同时创建前 4 个进程。如何避免这种成本?谢谢!


更新

多亏了亚历山大的善意回答,我删除了"多处理"和"悲情"代码中的管道。 "多处理"代码执行如亚历山大的 - 延迟减少到 1 秒,但"悲情"代码仍有超过 20 秒的延迟。 修改后的"悲情"代码发布在下面,

import pathos
import multiprocess
import time
from pympler import asizeof
import sys

def f(args):
(x, graph) = args
t = time.ctime()
print('factorial of %d: start@%s' % (x, t))
time.sleep(4)
return x

def main():
t0 = time.time()
params = []
var = (4, 8, 12, 20, 16)
p = pathos.multiprocessing.ProcessPool(nodes=4)
N = 1000
import networkx as nx
G = nx.complete_graph(N, nx.DiGraph())
import random
for (start, end) in G.edges:
G.edges[start, end]['weight'] = random.random()
print('Size of G by sys', sys.getsizeof(G), 'asizeof', asizeof.asizeof(G))
print('G created in %.2f' %  (time.time() - t0))
for i in var:
params.append((i, G))
res = list(p.imap(f, params))
p.close()
p.join()
if __name__ == '__main__':
main()

输出为

Size of G by sys 56 asizeof 338079824
G created in 17.36
factorial of 4: start@Fri May 31 11:39:26 2019
factorial of 8: start@Fri May 31 11:39:53 2019
factorial of 12: start@Fri May 31 11:40:19 2019
factorial of 20: start@Fri May 31 11:40:44 2019
factorial of 16: start@Fri May 31 11:41:10 2019
Process finished with exit code 0

创建每个进程时,应将此胖参数 (338 MB) 复制到单独的内存中,但这不应花费那么长时间(24 秒)。

以下是它在我的计算机上的工作方式:

  • 程序在 conn.send 中挂起。代码 (1.) 的问题在于多进程。管道()。从 https://docs.python.org/3.4/library/multiprocessing.html?highlight=process"...请注意,如果两个进程(或线程)尝试同时读取或写入管道的同一端,管道中的数据可能会损坏。

所以,我更改了代码:

import multiprocessing
import os
import time
import sys
from pympler import asizeof
import networkx as nx
import random
def factorial(args):
(x, t, graph) = args
s0 = '# pid %s x %2d' % (format(os.getpid()), x)
s1 = 'started @ %.2f' % (time.time() - t)
print(s0, s1)
f = 1
while x > 1:
f *= x
x -= 1
time.sleep(0.5)
s2 = 'ended   @ %.2f' % (time.time() - t)
print(s0, s2, f)
return s0, s1, s2, f
if __name__ == '__main__':
t0 = time.time()
N = 1000
G = nx.complete_graph(N, nx.DiGraph())
for (start, end) in G.edges:
G.edges[start, end]['weight'] = random.random()
print('Size of G by sys', sys.getsizeof(G), 'asizeof', asizeof.asizeof(G))
print('G created in %.2f' %  (time.time() - t0))
t0 = time.time()
p = multiprocessing.Pool(processes=4)
outputs = list(p.imap(factorial, [(i, t0, G) for i in (4, 8, 12, 20, 16)]))
print('output:')
for output in outputs:
print(output)

现在输出:

Size of G by sys 56 asizeof 338079824
G created in 13.03
# pid 2266 x  4 started @ 1.27
# pid 2267 x  8 started @ 1.98
# pid 2268 x 12 started @ 2.72
# pid 2266 x  4 ended   @ 2.77 24
# pid 2269 x 20 started @ 3.44
# pid 2266 x 16 started @ 4.09
# pid 2267 x  8 ended   @ 5.49 40320
# pid 2268 x 12 ended   @ 8.23 479001600
# pid 2266 x 16 ended   @ 11.60 20922789888000
# pid 2269 x 20 ended   @ 12.95 2432902008176640000
output:
('# pid 2266 x  4', 'started @ 1.27', 'ended   @ 2.77', 24)
('# pid 2267 x  8', 'started @ 1.98', 'ended   @ 5.49', 40320)
('# pid 2268 x 12', 'started @ 2.72', 'ended   @ 8.23', 479001600)
('# pid 2269 x 20', 'started @ 3.44', 'ended   @ 12.95', 2432902008176640000)
('# pid 2266 x 16', 'started @ 4.09', 'ended   @ 11.60', 20922789888000)

在 11 秒内创建 338 MB 数据,是的,启动前 4 个进程确实需要时间。启动之间的延迟要小得多:0.71、0.74、0.72 秒。我有配备英特尔 i5 @ 3.2 GHz 的 iMac。

当没有可见延迟时,最大的 N 是 78:

Size of G by sys 56 asizeof 1970464
G created in 0.08
# pid 2242 x  4 started @ 0.01
# pid 2243 x  8 started @ 0.01
# pid 2244 x 12 started @ 0.01
# pid 2245 x 20 started @ 0.01
# pid 2242 x  4 ended   @ 1.51 24
# pid 2242 x 16 started @ 1.53
# pid 2243 x  8 ended   @ 3.52 40320
# pid 2244 x 12 ended   @ 5.52 479001600
# pid 2242 x 16 ended   @ 9.04 20922789888000
# pid 2245 x 20 ended   @ 9.53 2432902008176640000
output:
('# pid 2242 x  4', 'started @ 0.01', 'ended   @ 1.51', 24)
('# pid 2243 x  8', 'started @ 0.01', 'ended   @ 3.52', 40320)
('# pid 2244 x 12', 'started @ 0.01', 'ended   @ 5.52', 479001600)
('# pid 2245 x 20', 'started @ 0.01', 'ended   @ 9.53', 2432902008176640000)
('# pid 2242 x 16', 'started @ 1.53', 'ended   @ 9.04', 20922789888000)

我将N更改为50,并在PyCharm中使用调试器运行"pathos"代码。在"G 在 7.79 中创建"之后停止。下面的输出证实了我对为什么"悲情"更慢的怀疑。Pathos 使用连接和套接字对象(取决于平台)来传递参数并启动子进程。这就是为什么它的速度要慢得多:大约 30 倍。好的一面是:它通过网络工作。

调试输出:

/usr/local/bin/python3.7 "/Applications/PyCharm CE.app/Contents/helpers/pydev/pydevd.py" --multiproc --qt-support=auto --client 127.0.0.1 --port 51876 --file /Users/alex/PycharmProjects/game/object_type.py
pydev debugger: process 1526 is connecting
Connected to pydev debugger (build 191.6605.12)
Size of G by sys 56 asizeof 57126904
G created in 7.79
Process ForkPoolWorker-3:
Process ForkPoolWorker-2:
Process ForkPoolWorker-1:
Process ForkPoolWorker-4:
Traceback (most recent call last):
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 297, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 297, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/pool.py", line 110, in worker
task = get()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/queues.py", line 354, in get
with self._rlock:
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/pool.py", line 110, in worker
task = get()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/synchronize.py", line 102, in __enter__
return self._semlock.__enter__()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/queues.py", line 354, in get
with self._rlock:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/synchronize.py", line 102, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
KeyboardInterrupt
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 297, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/pool.py", line 110, in worker
task = get()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/queues.py", line 355, in get
res = self._reader.recv_bytes()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/connection.py", line 219, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/connection.py", line 410, in _recv_bytes
buf = self._recv(4)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/connection.py", line 382, in _recv
chunk = read(handle, remaining)
Traceback (most recent call last):
KeyboardInterrupt
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 297, in _bootstrap
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/pool.py", line 110, in worker
task = get()
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/queues.py", line 354, in get
with self._rlock:
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/synchronize.py", line 102, in __enter__
return self._semlock.__enter__()
KeyboardInterrupt
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/pool.py", line 733, in next
item = self._items.popleft()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Applications/PyCharm CE.app/Contents/helpers/pydev/pydevd.py", line 1741, in <module>
main()
File "/Applications/PyCharm CE.app/Contents/helpers/pydev/pydevd.py", line 1735, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "/Applications/PyCharm CE.app/Contents/helpers/pydev/pydevd.py", line 1135, in run
pydev_imports.execfile(file, globals, locals)  # execute the script
File "/Applications/PyCharm CE.app/Contents/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"n", file, 'exec'), glob, loc)
File "/Users/alex/PycharmProjects/game/object_type.py", line 100, in <module>
outputs = list(p.imap(factorial, [(i, t0, G) for i in (4, 8, 12, 20, 16)]))
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/multiprocess/pool.py", line 737, in next
self._cond.wait(timeout)
File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 296, in wait
waiter.acquire()
KeyboardInterrupt

在相关的说明中:我在尝试将 pandas 数据帧作为参数传递给具有 joblib 管理并行处理的函数时遇到了这个问题。

Joblib 腌制参数以将信息传递给每个处理器。即使是中等大小 (<1MB) 的数据帧也可能非常耗时。就我而言,酸洗非常糟糕,以至于拥有 10-20 名工人的 joblib 比简单的循环慢。 但是,joblib 可以更有效地处理列表、字典和 np.array。因此,我发现一个简单的技巧是将包含数据帧内容的列表作为np.array和列传递,并在函数中重新组合。

将 param=[df.values, df.columns] 传递给 joblib 比简单地传递 param=df 快 50 倍。

相关内容

  • 没有找到相关文章

最新更新