所以我已经用numpy和multiprocessing写了一段时间的数字东西了。它可以工作,但我在收集结果时遇到了麻烦。我是这样做的,我用一个队列作为输入,一个队列作为输出。程序从输入队列中读取参数,对其进行处理,然后将结果放入输出队列。稍后在主进程中,我从队列中读取它并pickle它。像这样:
def fun(inp,outp):
while True:
try:
params = inp.get(block=False)
results = runprocess(params)
out.put(results,block=False)
except Empty:
break
之后在主循环中我做了以下操作:
for p in processes:
p.start()
for p in processes:
p.join()
while True:
try:
out = outp.get(block=False)
a[i] = [out]
except Empty:
break
fi = open(filename,"w")
cPickle.dump(fi,a)
fi.close()
但不知何故,总是发生两件事之一:要么pickle输出为空,要么进程挂起并保持运行,使用0%的cpu(一开始它们上升到100%,基本上是数字运算)。你知道我做错了什么吗?
好,我用Pool.map()重做了。为了让每个人都知道我是如何让它工作的,下面是代码片段:
ncpus = mp.cpu_count()
out = dict()
params = [(a,p) for p in np.arange(0.0,2.0,0.1) for a in np.arange(0.001,2.0,0.1)]
pool = mp.Pool(processes=ncpus)
results = pool.map(runm,params)
for i in results:
sigs = np.zeros((order,order))
sigsmf = np.zeros((order,order))
sigseq = np.zeros((order,order))
xs = np.array([])
freqs = np.array([])
[(a,p),sigs[:,:],sigsmf[:,:],sigseq[:,:],xs,freqs] = i
out[(a,p)] = [sigs[:,:],sigsmf[:,:],sigseq[:,:],xs,freqs]
print a, p, sigs[0,0]
工作就像一个魅力,更容易实现!
谢谢费迪南德!我不知道怎么做,但我想我们现在可以结束这个问题了!
您需要在get
呼叫中至少放置一个timeout
,并移除block
。在您当前的配置中,如果在调用get
时没有项可用,您将得到Empty
异常,跳出循环。如果您依赖于另一个线程来填充队列,而它没有及时填充队列,那么它将过早地退出循环并产生空结果。同样,put
可能会挂起,因为队列已满,从而挂起您的程序。
那么,用这样的语句:
params = inp.get(timeout=1)
out.put(timeout=1)
那是因为你有block=False。当收集器试图从队列中获取数据时,它不会立即在队列中找到数据。因此引发Empty异常并跳出循环
当从输入列表中获取数据时,我假设您可以指定block=False作为预填充列表。但是,输出队列是在运行时构建的。所以当你试图从中获取数据时,它可能是空的,因为输入过程需要更长的时间来处理。
如果您知道输入队列的长度,那么您可以尝试无限期地阻塞输出队列。如果没有,那么我建议您阻塞一个超时