Python 多处理池/进程的结果不一致



我的共享字典对象的条目数不一致。它应该有 500,但大多数测试最终会在 450 到 465 之间。我也尝试使用mapProcess而不是apply_async

map稍微好一点,因为共享词典有大约 480 个条目而不是大约 450 个条目,但它仍然不一致,并且并非全部 500 个条目都符合预期。

我也尝试使用Process,但这导致我的共享字典中的条目数量最少 - 大约420。

以下是使用 apply_async 的完整代码:

import numpy as np
from PIL import Image
from os import listdir
from multiprocessing import Manager, Pool
def processImage(path, d):
  image = np.array(Image.open(source + "/" + path))
  # Copy lists from shared dictionary since updates don't work otherwise
  w = d["width"]
  h = d["height"]
  w.append(image.shape[0])
  h.append(image.shape[1])
  d["width"] = w
  d["height"] = h
if __name__ == "__main__":
  source = "./sample/images"
  p = Pool()
  m = Manager()
  d = m.dict()
  d["width"], d["height"] = [], []
  for path in listdir(source):
    p.apply_async(processImage, (path, d))
  p.close()
  p.join()

以下是使用 map 的完整代码:

def processImage(obj):
  image = np.array(Image.open(source + "/" + obj[1]))
  w = obj[0]["width"]
  h = obj[0]["height"]
  w.append(image.shape[0])
  h.append(image.shape[1])
  obj[0]["width"] = w
  obj[0]["height"] = h
if __name__ == "__main__":
  source = "./sample/images"
  p = Pool()
  m = Manager()
  d = m.dict()
  d["width"], d["height"] = [], []
  p.map(processImage, zip(itertools.repeat(d), listdir(source)))

以下是使用 Process 的完整代码:

def processImage(path, d):
  image = np.array(Image.open(source + "/" + path))
  w = d["width"]
  h = d["height"]
  w.append(image.shape[0])
  h.append(image.shape[1])
  d["width"] = w
  d["height"] = h
if __name__ == "__main__":
  source = "./sample/images"
  p = Pool()
  m = Manager()
  d = m.dict()
  d["width"], d["height"] = [], []
  jobs = []
  for img in listdir(source):
    p = Process(target=processImage, args=(img, d))
    p.start()
    jobs.append(p)
  for j in jobs:
    j.join()
这是

竞争条件的典型示例。您需要某种同步原语来更新d

考虑以下情况:有两个线程(在您的情况下是子进程(执行processImage。首先获得wh,其次获得wh。首先将一些东西附加到两者并将其放回d。Second 对自己的 wh 做了一些事情,这不再考虑第一个线程所做的更改,并将其放回d 。此时,第一个线程所做的更改将丢失。

要解决此问题,您需要保护使用d的代码部分:

from multiprocessing import Manager, Pool, Lock
...
lock = Lock()
...
def processImage(path, d):
    image = np.array(Image.open(source + "/" + path))
    lock.acquire()
    d["width"].append(image.shape[0])
    d["height"].append(image.shape[1])
    lock.release()

相关内容

  • 没有找到相关文章

最新更新