并行Jupyter笔记本中的绘图循环



我使用的是Python 3.5.1版本。我想并行化一个循环,该循环用于使用imshow绘制一组数组。没有任何并行化的最小代码如下

import matplotlib.pyplot as plt
import numpy as np
# Generate data
arrays   = [np.random.rand(3,2) for x in range(10)]
arrays_2 = [np.random.rand(3,2) for x in range(10)]
# Loop and plot sequentially
for i in range(len(arrays)):
# Plot side by side
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
ax_1.imshow(arrays[i], interpolation='gaussian', cmap='RdBu', vmin=0.5*np.min(arrays[i]), vmax=0.5*np.max(arrays[i]))
ax_2.imshow(arrays_2[i], interpolation='gaussian', cmap='YlGn', vmin=0.5*np.min(arrays_2[i]), vmax=0.5*np.max(arrays_2[i]))
plt.savefig('./Figure_{}'.format(i), bbox_inches='tight')
plt.close()

这段代码目前是在Jupyter笔记本上写的,我只想通过Jupyter笔记进行所有处理。虽然这很有效,但实际上我有2500多个阵列,每秒大约1个绘图,这需要很长时间才能完成。我想做的是将计算分为N个处理器,这样每个处理器就可以绘制len(数组(/N个数组的图。由于这些图是单个阵列本身的图,因此在任何计算过程中,核心都不需要相互通信(无共享(。

我已经看到多处理程序包可以很好地解决类似的问题。然而,它不适用于我的问题,因为你不能将2D数组传递到函数中。如果我修改我的代码如上

# Generate data
arrays   = [np.random.rand(3,2) for x in range(10)]
arrays_2 = [np.random.rand(3,2) for x in range(10)]
x = list(zip(arrays, arrays_2))
def plot_file(information):
arrays, arrays_2 = list(information[0]), list(information[1])
print(np.shape(arrays[0][0]), np.shape(arrays_2[0][0]))

# Loop and plot sequentially
for i in range(len(arrays)):        
# Plot side by side
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
ax_1.imshow(arrays[i], interpolation='gaussian', cmap='RdBu', vmin=0.5*np.min(arrays[i]), vmax=0.5*np.max(arrays[i]))
ax_2.imshow(arrays_2[i], interpolation='gaussian', cmap='YlGn', vmin=0.5*np.min(arrays_2[i]), vmax=0.5*np.max(arrays_2[i]))
plt.savefig('./Figure_{}'.format(i), bbox_inches='tight')
plt.close()

from multiprocessing import Pool
pool = Pool(4)
pool.map(plot_file, x)

然后我得到错误"TypeError:Invalid dimensions for image data",数组维度的打印结果现在只有(2,(,而不是(3,2(。显然,这是因为多处理不能/不能将2D数组作为输入处理。

所以我在想,我怎么能在Jupyter笔记本里把它平行化?有人能教我怎么做吗?


编辑(2022年11月3日(:

我的原始代码的真正问题是pool.map(func,args(一次将一个args元素传递给单个处理器上的func,而不是像我想象的那样传递整个数组列表,这意味着当我试图在数组列表上循环时,我是在数组的行上循环,然后试图对行进行imshow绘图,从而产生错误。

无论如何,尽管这个问题已经有了一个很好的答案,但我想我会提供使用多处理的代码,只有在其他人有同样问题的情况下,或者如果有人想看看应该如何做的话。

n        = 10
arrays_1 = (np.random.rand(256, 256) for x in range(n))
arrays_2 = (np.random.rand(256, 256) for x in range(n))
x = zip(range(n), arrays_1, arrays_2) # need to pass the args into pool.map(func, args) as a tuple
def plot_file(information):

# get cpu name that is working on current data
process_name = multiprocessing.current_process().name
print('Process name {} is plotting'.format(process_name))
# unpack elements of tuple
index, arrays_1, arrays_2 = information

# plot
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
ax_1.imshow(arrays_1, interpolation='gaussian', cmap='RdBu')
ax_2.imshow(arrays_2, interpolation='gaussian', cmap='YlGn')
# save
plt.savefig('./{}'.format(index), bbox_inches='tight')
plt.close()
from multiprocessing import Pool
if __name__ == "__main__":

pool = multiprocessing.Pool(multiprocessing.cpu_count()//4) # use one quarter of available processors
pool.map(plot_file, x)                                      # sequentially map each element of x to the function and process

一种简单的方法是使用多处理引擎使用dask.distributed。我只建议使用一个外部模块,因为dask为您处理对象的序列化,这是一个非常简单的操作:

import matplotlib
# include this line to allow your processes to plot without a screen
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import dask.distributed
import numpy as np
def plot_file(i, array_1, array_2):
matplotlib.use('Agg')
# will be called once for each array "job"
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
for ax, arr, cmap in [(ax_1, array_1, 'RdBu'), (ax_2, array_2, 'YlGn')]:
ax.imshow(
arr,
interpolation='gaussian',
cmap='RdBu',
vmin=0.5*np.min(arr),
vmax=0.5*np.max(arr),
)
figure.savefig('./Figure_{}'.format(i), bbox_inches='tight')
plt.close(figure)
arrays   = [np.random.rand(3,2) for x in range(10)]
arrays_2 = [np.random.rand(3,2) for x in range(10)]
client = dask.distributed.Client() # uses multiprocessing by default
futures = client.map(plot_file, range(len(arrays)), arrays, arrays_2)
dask.distributed.progress(futures)

然而,如果可能的话,更高效的方法是在映射任务中生成或准备数组。这将允许您并行执行阵列操作、I/O等:

def prep_arrays_and_plot(i):
array_1 = np.random.rand(3,2)
array_2 = np.random.rand(3,2)
plot_file(i, array_1, array_2)
futures = client.map(prep_arrays_and_plot, range(10))
dask.distributed.progress(futures)

在这一点上,您不需要麻烦任何东西,所以使用多处理进行编写并不是什么大不了的事情。以下脚本运行良好:

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import multiprocessing
def plot_file(i, array_1, array_2):
matplotlib.use('Agg')
# will be called once for each array "job"
figure = plt.figure(figsize = (20, 12))
ax_1 = figure.add_subplot(1, 2, 1)
ax_2 = figure.add_subplot(1, 2, 2)
for ax, arr, cmap in [(ax_1, array_1, 'RdBu'), (ax_2, array_2, 'YlGn')]:
ax.imshow(
arr,
interpolation='gaussian',
cmap='RdBu',
vmin=0.5*np.min(arr),
vmax=0.5*np.max(arr),
)
figure.savefig('./Figure_{}'.format(i), bbox_inches='tight')
plt.close(figure)
def prep_arrays_and_plot(i):
array_1 = np.random.rand(3,2)
array_2 = np.random.rand(3,2)
plot_file(i, array_1, array_2)
def main():
pool = multiprocessing.Pool(4)
pool.map(prep_arrays_and_plot, range(10))
if __name__ == "__main__":
main()

请注意,如果您在jupyter笔记本上运行此程序,则不能简单地在单元格中定义函数并将其传递给多处理。水塘相反,您必须在不同的文件中定义它们并导入它们。这不适用于dask(事实上,如果使用dask在笔记本中定义函数,会更容易(。

最新更新