Python多处理循环总是为某些线程停止



我想通过使用多线程来加速数据加载和插值。到目前为止,我的代码似乎可以工作。唯一的问题是它总是运行到给定数量的proc。假设我的文件是100。如果我用24个内核运行脚本,那么只有24个文件被加载,但我需要继续运行100个。我在这里错过了什么?

print ('Number of overall VTK Files: ' + str(len(vtkArray)))
def dataoperator(N,i,vtkArray,field):
#for i in range(N-100,N):
print ("Loading data " +str(vtkArray[i,1] + ' index :' + str(i)))
points, cells, point_data, cell_data, field_data = meshio.read(ID +str(vtkArray[i,1]))
x,y,z=np.transpose(points)
print ("t Interpolating data " +str(vtkArray[i,1] + ' index :' + str(i)))
from scipy.interpolate import griddata
if (scalar=='p'):
p = np.transpose(point_data['p'])
pi= griddata((x, y, z), p, (xx, yy, zz), method='nearest')
field[i,:,:,:]= pi
else:
u,v,w = np.transpose(point_data['subtract(U,U_bf)'])
if (scalar=='u'):
ui= griddata((x, y, z), u, (xx, yy, zz), method='nearest')
field[i,:,:,:]= ui
elif (scalar=='v'):
vi= griddata((x, y, z), v, (xx, yy, zz), method='nearest')
field[i,:,:,:]= vi
else:
wi = griddata((x, y, z), w, (xx, yy, zz), method='nearest')
field[i,:,:,:]= wi
del points, cells, point_data, cell_data, field_data

import multiprocessing
jobs = []
for i in range(0, procs):
process = multiprocessing.Process(target=dataoperator,args=(N, i, vtkArray,field))
jobs.append(process)
# Start the processes 
for j in jobs:
j.start()
# Ensure all of the processes have finished
for j in jobs:
j.join()

在您的代码中,创建了procs进程,这些进程使用参数(N, i, vtkArray,field)只调用dataoperator一次。索引i永远不会大于等于procs

重写代码,使其利用队列:(我假设vtkArray包含100种不同的情况(。

# Assuming python3.    
from queue import Empty as EmptyException
resultQueue = multiprocessing.Queue() # to return data from your workers
taskQueue = multiprocessing.Queue()
processes = []
def _worker(taskQueue, resultQueue, N, vtkArray, field):
while True:
try:
# Block maximally 0.1s before EmptyException is thrown.
i = taskQueue.get(timeout=0.1)
ret = dataoperator(N, i, vtkArray, field)
resultQueue.put(ret)
except EmptyException:
# Stop the loop.
break
for i in range(len(vtkArray)):
taskQueue.put(i)
for i in range(procs):
process = multiprocessing.Process(target=_worker,
args=(taskQueue, resultQueue, N, vtkArray, field),
name="process%03d" % i)
process.start()
processes.append(process)
for p in processes:
p.join()
try:
rets = []
while True:
rets.append(resultQueue.get(block=False))
except EmptyException:
pass

我还没有测试过这个代码,只是把它作为一个起点。我强烈建议您阅读multiprocessing模块的文档。(我也建议使用上下文,但这是另一回事。(

由于normanius的帮助,我能够正确地设置加载和插值多个文件,此外,我仍然无法使用resultQueue.get((收集数据。列表总是空的,尽管ret的_workers函数中的print语句给出了结果:

再次澄清我的目的:

1( 设置一个多维数组(全局定义(field=np.zeros([N,nx,ny,nz]) whereN=len(vtkArray(和vtkArray只是一个要处理的文件列表(最多100个(,nx,ny,nz用于插值的节点数

2( dataoperator((是一个加载和插值数据的函数,现在使用多处理,其中索引i对应于处理器的索引,以及将多维数组字段的加载数据放在哪一行。

3( normanius建议的代码似乎运行良好,因为_workers函数中ret的打印语句显示了每个的正确结果

4( 无论如何,我仍然不可能用resultQueue.get((函数接收到完全"填充"的多维数组。是否也可以再次返回,一个多维数组,或者我必须使用rets=[]作为列表。

提前感谢您的帮助,

###################################################
#Loading all vtk files and saving to multidim array
###################################################
def dataoperator(i,vtkArray,nx,ny,nz,x0,x1,y0,y1,z0,z1,scalar):
xi = np.linspace(x0,x1,nx,endpoint=True)
yi = np.linspace(y0,y1,ny,endpoint=True)
zi = np.linspace(z0,z1,nz,endpoint=True)
yy,xx,zz=np.meshgrid(yi,xi,zi)
#Generate MultiDimensional array
vtkArrayString=[str(x) for x in vtkArray[:,1]]
print ("Loading data " +str(prefix) + str(vtkArrayString[i])+' with index ' + str(i))
points, cells, point_data, cell_data, field_data = meshio.read('./VTK/' +str(vtkArrayString[i]))
#points, point_data=loaddata(vtkArrayString,i)
x,y,z=np.transpose(points)
print ("t Interpolating data " +str(vtkArrayString[i]))
from scipy.interpolate import griddata
if (scalar=='p'):
p = np.transpose(point_data['p'])
pi= griddata((x, y, z), p, (xx, yy, zz), method='nearest')
field[i,:,:,:]= pi
else:
u,v,w = np.transpose(point_data['subtract(U,U_bf)'])
if (scalar=='u'):
ui= griddata((x, y, z), u, (xx, yy, zz), method='nearest')
field[i,:,:,:]= ui
elif (scalar=='v'):
vi= griddata((x, y, z), v, (xx, yy, zz), method='nearest')
field[i,:,:,:]= vi
else:
wi = griddata((x, y, z), w, (xx, yy, zz), method='nearest')
field[i,:,:,:]= wi
#return field, vtkArray
print ("t Finished Interpolating data " +str(vtkArrayString[i]))
return field 
import multiprocessing
from queue import Empty as EmptyException

resultQueue = multiprocessing.Queue() # to return data from your workers
taskQueue = multiprocessing.Queue()
processes = []
def _worker(taskQueue, resultQueue, i, vtkArray):
try:
# Block maximally 0.1s before EmptyException is thrown.
i = taskQueue.get(timeout=0.1)
ret  = dataoperator(i,vtkArray,nx,ny,nz,x0,x1,y0,y1,z0,z1,scalar)
print (ret)
resultQueue.put(ret)
except EmptyException:
# Idle action, if needed.
pass
for i in range(steps):
taskQueue.put(i)
for i in range(procs):
process = multiprocessing.Process(target=_worker,
args=(taskQueue, resultQueue, i, vtkArray),
name="process%03d" % i)
process.start()
processes.append(process)
#ret2=np.zeros([N,nx,ny,nz])
for p in processes:
p.join()
try:
rets = []
while True:
rets.append(resultQueue.get(block=False))
print (rets)
except EmptyException:
pass
print(rets)
field=np.array(rets)
print(field)

相关内容

  • 没有找到相关文章

最新更新