Python 多处理在通过管道传递大型数组时卡住



我在python中使用多处理,并尝试通过管道将一个大的numpy数组传递给子进程。它适用于小数组,但挂起较大的数组而不返回错误。

我相信管道被堵塞了,并且已经阅读了一些关于它的信息,但无法弄清楚如何解决问题。

def f2(conn, x):
conn.start()
data = conn.recv()
conn.join()
print(data)
do_something(x)
conn.close()
if __name__ == '__main__':
data_input = read_data()    # large numpy array
parent_conn, child_conn = Pipe()
p = multiprocessing.Pool(processes=8)      
func = partial(f2, child_conn)
parent_conn.send(data_input)
parent_conn.close()
result = p.map(func, processes)
p.close()
p.join()

忽略此代码中的所有其他问题(您没有要传递给mapx,您不使用xf2接收,将Pool.mapPipe混合通常是错误的做法),您的最终问题是在工作进程可以从中读取之前执行阻塞send调用。

假设你真的想mapPipe混合,解决方案是在开始send之前异步启动map,所以当父尝试写入时,另一端有一些东西可以从Pipe读取:

if __name__ == '__main__':
data_input = read_data()    # large numpy array
parent_conn, child_conn = Pipe()
# Use with to avoid needing to explicitly close/join
with multiprocessing.Pool(processes=8) as p:
func = partial(f2, child_conn)
# Launch async map to ensure workers are running
future = p.map_async(func, x)
# Can perform blocking send as workers will consume as you send
parent_conn.send(data_input)
parent_conn.close()
# Now you can wait on the map to complete
result = future.get()

如前所述,由于x的问题,此代码将无法运行,即使运行,Pipe文档也明确警告不应同时从Pipe读取两个不同的进程。

如果您想在单个工作线程中批量处理数据,您只需使用ProcessPipe,如下所示:

def f2(conn):
data = conn.recv()
conn.close()
print(data)
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
proc = multiprocessing.Process(target=f2, args=(child_conn,))
proc.start()
data_input = read_data()    # large numpy array
parent_conn.send(data_input)
parent_conn.close()
proc.join()

如果要跨多个工作线程单独处理每个元素,只需使用Poolmap

def f2(x):
print(x)
if __name__ == '__main__':
data_input = read_data()    # large numpy array
with multiprocessing.Pool(processes=8) as p:   
result = p.map(f2, data_input)

最新更新