为什么以下python多处理队列会根据返回值产生死锁



我使用以下代码遇到了一些死锁。

import time
from multiprocessing import Process, Queue
import numpy as np

def f(q):
q.put([(np.random.rand(50000), 0.993) for _ in range(10000)])
def g(q):
time.sleep(3)  # to make both functions take approximately the same amount of time
q.put('X' * 100000)

if __name__ == '__main__':
print([(np.random.rand(50000), 0.993) for _ in range(10000)])
queue = Queue()
p = Process(target=g, args=(queue,))
p.start()
obj = queue.get()
p.join()   # after get() as suggested in the docs
print("Done g")
print(obj)
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
obj = queue.get()
p.join()  # after get() as suggested in the docs
print("Done f")
print(obj)

函数g似乎很容易在子流程中计算,而运行流程计算f则会导致死锁(看起来(。

我知道这类似于在这里输入链接描述。然而,公认的答案建议要么切换q.get((和p.join((,要么完全删除p.join。做这两件事都不能解决我的问题。

此外,开头的print语句表明,无论f在做什么,都不应该花费很长时间。

更新:我意识到这可能是因为f的结果太大了。如果是这样的话,我想知道在进程之间交换大数据的标准方式是什么

我可以告诉您,在我的8M Windows桌面上,即使numpy数组只包含20000个元素,机器也会冻结。

以下是我对其进行编码的方式,以便它能以更低的内存利用率更快地运行:

import time
from multiprocessing import Process, Queue
import numpy as np
def f(q):
# Instead of putting a list with 10,000 elements,
# do 10,000 puts, which can be retrieved by the main process
# one by one to aggregate the results. This should reduce the
# amount of memory taken up by the queue since elements are
# being taken off the queue by the main process as the child process is
# putting them on the queue:
for _ in 10000:
q.put((np.random.rand(50000), 0.993))
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
t = time.time()
p.start()
obj = []
for _ in 1000:
obj.append(queue.get())
p.join()   # after get() as suggested in the docs
print("Done f", time.time() - t)
#print(obj)

如果您不知道f将在队列中放入多少项目,那么f应该在队列中放置最后一个sentinel项目,表示没有更多的数据要检索。这个sentinel必须是一个不能被误认为实际数据项的对象。在这种情况下,None是理想的哨兵:

import time
from multiprocessing import Process, Queue
import numpy as np
def f(q):
# Instead of putting a list with 10,000 elements,
# do 10,000 puts, which can be retrieved by the main process
# one by one to aggregate the results. This should reduce the
# amount of memory taken up by the queue since elements are
# being taken off the queue by the main process as the child process is
# putting them on the queue:
for _ in 10000:
q.put((np.random.rand(50000), 0.993))
# Put a sentinel:
q.put(None)
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
t = time.time()
p.start()
obj = []
while True:
item = queue.get()
# Sentinel?
if item is None:
break
obj.append(item)
p.join()   # after get() as suggested in the docs
print("Done f", time.time() - t)
#print(obj)

相关内容

  • 没有找到相关文章

最新更新