我使用以下代码遇到了一些死锁。
import time
from multiprocessing import Process, Queue
import numpy as np
def f(q):
q.put([(np.random.rand(50000), 0.993) for _ in range(10000)])
def g(q):
time.sleep(3) # to make both functions take approximately the same amount of time
q.put('X' * 100000)
if __name__ == '__main__':
print([(np.random.rand(50000), 0.993) for _ in range(10000)])
queue = Queue()
p = Process(target=g, args=(queue,))
p.start()
obj = queue.get()
p.join() # after get() as suggested in the docs
print("Done g")
print(obj)
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
obj = queue.get()
p.join() # after get() as suggested in the docs
print("Done f")
print(obj)
函数g似乎很容易在子流程中计算,而运行流程计算f则会导致死锁(看起来(。
我知道这类似于在这里输入链接描述。然而,公认的答案建议要么切换q.get((和p.join((,要么完全删除p.join。做这两件事都不能解决我的问题。
此外,开头的print语句表明,无论f
在做什么,都不应该花费很长时间。
更新:我意识到这可能是因为f的结果太大了。如果是这样的话,我想知道在进程之间交换大数据的标准方式是什么
我可以告诉您,在我的8M Windows桌面上,即使numpy
数组只包含20000个元素,机器也会冻结。
以下是我对其进行编码的方式,以便它能以更低的内存利用率更快地运行:
import time
from multiprocessing import Process, Queue
import numpy as np
def f(q):
# Instead of putting a list with 10,000 elements,
# do 10,000 puts, which can be retrieved by the main process
# one by one to aggregate the results. This should reduce the
# amount of memory taken up by the queue since elements are
# being taken off the queue by the main process as the child process is
# putting them on the queue:
for _ in 10000:
q.put((np.random.rand(50000), 0.993))
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
t = time.time()
p.start()
obj = []
for _ in 1000:
obj.append(queue.get())
p.join() # after get() as suggested in the docs
print("Done f", time.time() - t)
#print(obj)
如果您不知道f
将在队列中放入多少项目,那么f
应该在队列中放置最后一个sentinel项目,表示没有更多的数据要检索。这个sentinel必须是一个不能被误认为实际数据项的对象。在这种情况下,None
是理想的哨兵:
import time
from multiprocessing import Process, Queue
import numpy as np
def f(q):
# Instead of putting a list with 10,000 elements,
# do 10,000 puts, which can be retrieved by the main process
# one by one to aggregate the results. This should reduce the
# amount of memory taken up by the queue since elements are
# being taken off the queue by the main process as the child process is
# putting them on the queue:
for _ in 10000:
q.put((np.random.rand(50000), 0.993))
# Put a sentinel:
q.put(None)
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
t = time.time()
p.start()
obj = []
while True:
item = queue.get()
# Sentinel?
if item is None:
break
obj.append(item)
p.join() # after get() as suggested in the docs
print("Done f", time.time() - t)
#print(obj)