我正在为学校的作业做一些并行化。将图片转换为灰度。但是自从我实施了multiprocessing.Array
,我得到了糟糕的结果。随着处理器数量的增加,情况会更糟。
速度很好,正如预期的那样,当我将结果输入numpy数组时(但显然我没有得到有效的图片)。但是自从我切换到mp数组后,它就变得迟钝了,但是我从中得到了很好的灰度图片。
我还尝试用processexecutor实现这一点,但当我将mp数组作为参数传递时(甚至没有达到pid打印,这是在被调用函数的第一行),它冻结了
代码:
from PIL import Image
import numpy
import time
from multiprocessing import Process
from multiprocessing import Array
if __name__ == "__main__":
main2()
def main2():
for i in range(3):
filename = "%d.jpg" % i
cols, rows, mpx, source_picture, result_array_seq, result_array_par = load_picture(filename)
print(filename, "%dx%d" % (cols, rows), "%f Mpx" % mpx)
seq_time, new_image = grayscale_seq(cols, rows, source_picture, result_array_seq)
new_image.save("%d_gray_seq.jpg" % i)
print("Seq run:", time.strftime('%H:%M:%S', time.gmtime(seq_time)))
for p in range(2, 7, 2): #run on 2, 4 and 6 cores
par_time, new_image = grayscale_par(cols, rows, source_picture, result_array_par, p)
new_image.save("%d_gray_par_%d.jpg" % (i, p))
print("Par run with %d cores" % p, time.strftime('%H:%M:%S', time.gmtime(par_time)))
def grayscale_seq(cols, rows, source_pic, dest_pic):
time_start = time.time()
for row in range(rows):
for col in range(cols):
r, g, b = source_pic[row, col]
dest_pic[row, col] = calculate_rgb(r, g, b)
time_end = time.time()
time_diff = (time_end - time_start)
result_array = numpy.array(dest_pic).astype(numpy.uint8)
new_image = Image.fromarray(result_array)
return time_diff, new_image
def grayscale_par(cols, rows, source_pic, dest_pic, num_of_cpus):
tuples = splitIndex(num_of_cpus, rows)
process_array = []
for i in range(len(tuples)):
start_index, end_index = tuples[i]
picture_slice = source_pic[start_index:end_index, :, :]
p = Process(target=grayscaleSlice, args=(picture_slice, dest_pic, tuples[i][0], cols))
process_array.append(p)
time_start = time.time()
for process in process_array:
process.start()
# process.join()
for process in process_array:
process.join()
time_end = time.time()
time_diff = (time_end - time_start)
image = numpy.array(dest_pic).astype(numpy.uint8).reshape(rows, cols)
new_image = Image.fromarray(image)
return time_diff, new_image
def splitIndex(numprocessors, height):
resultarray = [0]
previousindex = 0
for i in range(numprocessors - 1):
result = previousindex + height // numprocessors
resultarray.append(result)
previousindex = result
indextuples = []
for i in range(numprocessors):
if i == numprocessors - 1:
indextuples.append((resultarray[i], height))
break
indextuples.append((resultarray[i], resultarray[i + 1]))
return indextuples
def grayscaleSlice(picture_slice, array, start_index, cols):
for row in range(len(picture_slice)):
for col in range(cols):
R = picture_slice[row, col, 0]
G = picture_slice[row, col, 1]
B = picture_slice[row, col, 2]
result = int(round(0.299 * R + 0.587 * G + 0.114 * B))
array[start_index * cols + row * cols + col] = result
def load_picture(filename):
im = Image.open(filename)
cols, rows = im.size
mpx = (cols * rows) / 1000000
source_picture = numpy.asarray(im)
result_array_seq = numpy.zeros((rows, cols))
result_array_par = Array('i', rows * cols)
return cols, rows, mpx, source_picture, result_array_seq, result_array_par
def calculate_rgb(r, g, b):
return int(round(0.299 * r + 0.587 * g + 0.114 * b))
结果:
0.jpg 500x375 0.187500 Mpx
Seq run: 00:00:06
Par run with 2 cores 00:00:01
Par run with 4 cores 00:00:01
Par run with 6 cores 00:00:02
1.jpg 1920x1200 2.304000 Mpx
Seq run: 00:00:16
Par run with 2 cores 00:00:12
Par run with 4 cores 00:00:18
Par run with 6 cores 00:00:21
2.jpg 3100x2074 6.429400 Mpx
Seq run: 00:00:47
Par run with 2 cores 00:00:32
Par run with 4 cores 00:00:45
Par run with 6 cores 00:01:03
每次访问共享内存组件(如multiprocessing的Array)时,都必须进行一些进程间通信。当然,这比正常的内存访问要慢,更糟糕的是,它倾向于锁定数组,这意味着您的工作线程大多是相互锁定的,而不是真正的工作。
如果不是一次发送一个像素,如果你批量处理结果,你可以加快速度。例如,如果您更改为列出需要更新的像素的模型,并让主worker执行更新目标数组的实际工作:
def grayscale_par(cols, rows, source_pic, dest_pic, num_of_cpus):
# dest_pic is now a numpy array, just like in grayscale_seq
tuples = splitIndex(num_of_cpus, rows)
process_array = []
# A queue for the workers to store result sets into
queue = Queue()
for i in range(len(tuples)):
start_index, end_index = tuples[i]
picture_slice = source_pic[start_index:end_index, :, :]
p = Process(target=grayscaleSlice, args=(queue, picture_slice, tuples[i][0], cols))
process_array.append(p)
time_start = time.time()
for process in process_array:
process.start()
# Go ahead and apply the result set from each worker once it's ready
# We want to do this once for each process we launched, so
# loop through that array. The value is ignored, so just use an
# underscore here.
for _ in process_array:
# Now, for each process, call queue.get() to get it's result
# array. This will block till it returns. Once it returns
# we iterate through that array, one item at a time.
# Each item is just a tuple telling us which pixel to
# update to which value
for row, col, value in queue.get():
# So, finally, we update each pixel in turn
dest_pic[row, col] = value
for process in process_array:
process.join()
time_end = time.time()
time_diff = (time_end - time_start)
image = numpy.array(dest_pic).astype(numpy.uint8)
new_image = Image.fromarray(image)
return time_diff, new_image
def grayscaleSlice(queue, picture_slice, start_index, cols):
# Just store the result in an array
result = []
for row in range(len(picture_slice)):
for col in range(cols):
R = picture_slice[row, col, 0]
G = picture_slice[row, col, 1]
B = picture_slice[row, col, 2]
value = int(round(0.299 * R + 0.587 * G + 0.114 * B))
result.append((start_index + row, col, value))
# All done, go ahead and send the result set along to the main thread
queue.put(result)
这将导致线程被更有效地使用。在我的机器上,使用一个示例图像,这会输出以下内容,显示出更多内核时速度的明显提高。
0.jpg 2272x1704 3.871488 Mpx
Seq run: 00:00:29
Par run with 2 cores 00:00:14
Par run with 4 cores 00:00:08
Par run with 6 cores 00:00:06