在Python中共享多处理内存的更好方法



我已经解决这个问题一周了,它变得非常令人沮丧,因为每次我实现一个更简单但规模相似的示例来说明我需要做什么,结果发现多处理会把它搞砸。它处理共享内存的方式让我很困惑,因为它太有限了,很快就会变得毫无用处。

因此,我的问题的基本描述是,我需要创建一个过程,通过一些参数来打开图像,并创建大约20K个60x40大小的补丁。这些补丁一次保存到列表2中,并且需要返回到主线程,然后由GPU上运行的2个其他并发进程再次处理。

流程和工作流程以及所有大部分都得到了处理,我现在需要的是本应最简单的部分,结果却变成了最困难的部分。我无法保存并将带有20K补丁的列表返回到主线程。

第一个问题是因为我把这些补丁保存为PIL图像。然后我发现添加到Queue对象的所有数据都必须进行pickle。第二个问题是,我将补丁转换为每个60x40的数组,并将它们保存到列表中。现在这仍然不起作用?显然,队列可以保存的数据量有限,否则当您调用queue_obj.get()时,程序将挂起。

我尝试过很多其他的东西,但我尝试的每一个新东西都不起作用,所以我想知道是否有人推荐了一个库,我可以用它来共享对象,而不需要所有的模糊?

这是我正在研究的一个示例实现。请记住,这非常好用,但完整的实现并不好。我确实让代码打印信息消息,以查看正在保存的数据具有完全相同的形状和所有内容,但由于某种原因,它不起作用。在完全实现中,独立进程成功完成,但在q.get()处冻结。

from PIL import Image
from multiprocessing import Queue, Process
import StringIO
import numpy
img = Image.open("/path/to/image.jpg")
q = Queue()
q2 = Queue()
#
#
# MAX Individual Queue limit for 60x40 images in BW is 31,466.
# Multiple individual Queues can be filled to the max limit of 31,466.
# A single Queue can only take up to 31,466, even if split up in different puts.
def rz(patch, qn1, qn2):
    totalPatchCount = 20000
    channels = 1
    patch = patch.resize((60,40), Image.ANTIALIAS)
    patch = patch.convert('L')
    # ImgArray = numpy.asarray(im, dtype=numpy.float32)
    list_im_arr = []
    # ----Create a 4D Array
    # returnImageArray = numpy.zeros(shape=(totalPatchCount, channels, 40, 60))
    imgArray = numpy.asarray(patch, dtype=numpy.float32)
    imgArray = imgArray[numpy.newaxis, ...]
    # ----End 4D array
    # list_im_arr2 = []
    for i in xrange(totalPatchCount):
        # returnImageArray[i] = imgArray
        list_im_arr.append(imgArray)
    qn1.put(list_im_arr)
    qn1.cancel_join_thread()
    # qn2.cancel_join_thread()
    print "PROGRAM Done"
# rz(img,q,q2)
# l = q.get()
#
p = Process(target=rz,args=(img, q, q2,))
p.start()
p.join()
#
# # l = []
# # for i in xrange(1000): l.append(q.get())
#
imdata = q.get()

队列用于进程之间的通信。在你的情况下,你并没有真正的这种沟通。您可以简单地让进程返回结果,并使用.get()方法来收集它们。(记住添加if __name__ == "main":,请参阅编程指南)

from PIL import Image
from multiprocessing import Pool, Lock
import numpy
img = Image.open("/path/to/image.jpg")
def rz():
    totalPatchCount = 20000
    imgArray = numpy.asarray(patch, dtype=numpy.float32)
    list_im_arr = [imgArray] * totalPatchCount  # A more elegant way than a for loop
    return list_im_arr
if __name__ == '__main__':  
    # patch = img....  Your code to get generate patch here
    patch = patch.resize((60,40), Image.ANTIALIAS)
    patch = patch.convert('L')
    pool = Pool(2)
    imdata = [pool.apply_async(rz).get() for x in range(2)]
    pool.close()
    pool.join()

现在,根据本文的第一个答案,多处理只传递可拾取的对象。酸洗在多处理中可能是不可避免的,因为进程不共享内存。他们根本不生活在同一个宇宙里。(它们在刚出生时确实继承了记忆,但它们无法走出自己的宇宙)。PIL图像对象本身不可拾取。您可以通过只提取存储在其中的图像数据来使其可拾取,就像这篇文章建议的那样。

由于您的问题主要是I/O绑定的,因此您也可以尝试多线程处理。对于你的目的来说,它可能会更快。线程共享所有内容,因此不需要酸洗。如果您使用的是python3,ThreadPoolExecutor是一个很好的工具。对于Python 2,可以使用ThreadPool。为了获得更高的效率,你必须重新安排你做事的方式,你想打破这个过程,让不同的线程来完成任务。

from PIL import Image
from multiprocessing.pool import ThreadPool
from multiprocessing import Lock
import numpy
img = Image.open("/path/to/image.jpg")
lock = Lock():
totalPatchCount = 20000
def rz(x):
    patch = ...
    return patch
pool = ThreadPool(8)
imdata = [pool.map(rz, range(totalPatchCount)) for i in range(2)]
pool.close()
pool.join()

您说"显然,队列可以保存的数据量有限,否则当您调用queue_obj.get()时,程序将挂起。"

你是对的,也是错的。Queue将在不被耗尽的情况下保持有限的信息量。问题是,当你这样做时:

qn1.put(list_im_arr)
qn1.cancel_join_thread()

它调度到底层管道的通信(由线程处理)。qn1.cancel_join_thread()然后说"但如果我们在没有完成预定的put的情况下退出,那就很酷了",当然,几微秒后,worker函数退出,Process退出(而无需等待填充管道的线程真正这样做;充其量它可能已经发送了对象的初始字节,但任何不适合PIPE_BUF的东西几乎肯定会被丢弃;您需要一些惊人的竞争条件才能获得任何东西,更不用说整个大对象了)。所以稍后,当你这样做时:

imdata = q.get()

(现在退出的)CCD_ 9实际上没有发送任何内容。当您调用q.get()时,它正在等待从未真正传输过的数据。

另一个答案是正确的,即在计算和传递单个值的情况下,Queue s是多余的。但是,如果你要使用它们,你需要正确地使用它们。修复方法是:

  1. 删除对qn1.cancel_join_thread()的调用,使Process在数据通过管道传输之前不会退出
  2. 重新安排通话以避免死锁

重新排列就是这样:

p = Process(target=rz,args=(img, q, q2,))
p.start()
imdata = q.get()
p.join()

在CCD_ 15之后移动CCD_ 14;如果您先尝试join,则您的主进程将等待子进程退出,并且子进程将在队列退出之前等待队列被消耗(如果Queue的管道被主进程中的线程耗尽,这实际上可能有效,但最好不要指望实现细节;无论实现细节如何,只要puts和gets匹配,这种形式都是正确的)。

相关内容

  • 没有找到相关文章

最新更新