我有一个简单的示例问题,我在python中遇到的问题。我正在使用Multiprocess执行一个示例,其中函数" thread_test()"将在0到1的间隔上生成一个均匀的随机数数组,而数组中的" sample_size"数量数据点数。一旦下了示例,我计划生成该过程的多个副本,以加速代码执行,然后将一组更复杂的计算集中在thread_test()中。只要我将sample_size保持在9,000以下,该示例就可以正常工作。当我将sample_size从10增加到8,000时,执行时间增加,但是在8,000时,代码仅需0.01秒即可执行。但是,一旦我将sample_size提高到9,000,代码就会永远执行,并且永远不会完成计算。是什么原因造成的?
from multiprocessing import Process, Queue
import queue
import random
import timeit
import numpy as np
def Thread_Test(Sample_Size):
q.put(np.random.uniform(0,1,Sample_Size))
return
if __name__ == "__main__":
Sample_Size = 9000
q = Queue()
start = timeit.default_timer()
p = Process(target=Thread_Test,args=(Sample_Size,))
p.start()
p.join()
result = np.array([])
while True:
if not q.empty():
result = np.append(result,q.get())
else:
break
print (result)
stop = timeit.default_timer()
print ('{}{:4.2f}{}'.format("Computer Time: ", stop-start, " seconds"))
问题发生了,因为如果您将STH放入sub Process中(生产者),则必须保证主要过程(消费者)同时获得元素。否则,主要过程将在" p.join()"中等待,而sub进程在" queue.ut.put"中等待,因为队列中的elem太多,而没有消费者为新Elem腾出更多空间。
在此处:
Bear in mind that a process that has put items in a queue will wait before terminating until
all the buffered items are fed by the “feeder” thread to the underlying pipe
因此,简单地说,您需要在" p.join()"之前调用"获取部分"。
如果您担心子进程工作之前的主要过程退出,则可以将代码更改为以下:
while True:
# check subprocess running before break
running = p.is_alive()
if not q.empty():
result = np.append(result,q.get())
else:
if not running:
break
整个部分都喜欢以下:
def Thread_Test(q, Sample_Size):
q.put(np.random.uniform(0,1,Sample_Size))
if __name__ == "__main__":
Sample_Size = 9000
q = Queue()
start = timeit.default_timer()
p = Process(target=Thread_Test,args=(q, Sample_Size,))
p.daemon = True
p.start()
result = np.array([])
while True:
running = p.is_alive()
if not q.empty():
result = np.append(result,q.get())
else:
if not running:
break
p.join()
stop = timeit.default_timer()
print ('{}{:4.2f}{}'.format("Computer Time: ", stop-start, " seconds"))