我是多处理的新手。当我运行下面的代码时,会出现一些错误,每次都会有所不同。这是我的代码:
import numpy as np
import multiprocessing as mp
mesh_num=256
mean=[0,0]
cov=[[10,0],[0,20]]
position=np.random.multivariate_normal (mean,cov,size=4)
x_range=np.linspace(np.min(position[:,0]),np.max(position[:,0]),mesh_num)
y_range=np.linspace(np.min(position[:,1]),np.max(position[:,1]),mesh_num)
x_step=x_range[1]-x_range[0]
y_step=y_range[1]-y_range[0]
x_range=np.linspace(np.min(position[:,0])-x_step,np.max(position[:,0])+x_step,mesh_num)
y_range=np.linspace(np.min(position[:,1])-y_step,np.max(position[:,1])+y_step,mesh_num)
x_step=x_range[1]-x_range[0]
y_step=y_range[1]-y_range[0]
charge_density=mp.Array('d',np.zeros(mesh_num*mesh_num))
position=position.tolist()
def distribute(position):
column_index_left=int(abs(position[0]-x_range[0])/x_step)
column_index_right=column_index_left+1
row_index_up=int(abs(position[1]-y_range[0])/y_step)
row_index_down=row_index_up+1
print(position[0])
print(position[1])
print(x_range)
print(y_range)
print(column_index_left)
print(column_index_right)
print(row_index_up)
print(row_index_down)
if __name__ == "__main__":
pool = mp.Pool()
pool.map(distribute,position)
pool.close()
pool.join()
我发现x_range和y_range在函数"distribute"中与这两个不同。这让我很困惑。我认为它们应该是一样的。如何解决问题?
我不能复制索引越界问题。坦率地说,我正试图遵循你的逻辑。但显然是另一个问题:
"+="不是原子操作,因此,如果有两个任务试图更新同一索引,则需要明确将此操作视为关键部分,例如:
with charge_density.get_lock():
charge_density[mesh_num*row_index_up+column_index_left]+=(1-abs(x_range[column_index_left]-position[0])/x_step)*(1-abs(y_range[row_index_up]-position[1])/y_step)
您可以为所有4个'+='操作保持锁定,也可以为连续操作释放并重新获取锁定。如果并行任务不可能更新相同的索引,那么通过创建RawArray
而不是Array
,代码将运行得更快。
此外,你说";你能在你的电脑上运行我的代码吗&";。此代码只能在使用fork创建新进程的计算机上运行,例如Linux。例如,在使用spawn的Windows上,多处理池中的每个进程都将创建自己的共享内存数组实例。那不是很好分享,是吗?为了便于移植,代码需要重新组织如下:
import numpy as np
import multiprocessing as mp
def init_pool_processes(*args):
global mesh_num, mean, cov, position, x_range, y_range, x_step, y_step, x_range, y_range, x_step, y_step, charge_density
mesh_num, mean, cov, position, x_range, y_range, x_step, y_step, x_range, y_range, x_step, y_step, charge_density = args
def distribute(position):
column_index_left = int(abs(position[0] - x_range[0])/x_step)
column_index_right = column_index_left + 1
row_index_up = int(abs(position[1] - y_range[0])/y_step)
row_index_down = row_index_up + 1
with charge_density.get_lock():
charge_density[mesh_num*row_index_up + column_index_left] += (1 - abs(x_range[column_index_left] - position[0])/x_step)*(1 - abs(y_range[row_index_up] - position[1])/y_step)
charge_density[mesh_num*row_index_up + column_index_right] += (1 - abs(x_range[column_index_right] - position[0])/x_step)*(1 - abs(y_range[row_index_up] - position[1])/y_step)
charge_density[mesh_num*row_index_down + column_index_left] += (1 - abs(x_range[column_index_left] - position[0])/x_step)*(1 - abs(y_range[row_index_down] - position[1])/y_step)
charge_density[mesh_num*row_index_down + column_index_right] += (1 - abs(x_range[column_index_right] - position[0])/x_step)*(1 - abs(y_range[row_index_down] - position[1])/y_step)
if __name__ == "__main__":
mesh_num = 256
mean = [0, 0]
cov = [[10,0], [0,20]]
position = np.random.multivariate_normal (mean, cov, size=100000)
x_range = np.linspace(np.min(position[:,0]), np.max(position[:,0]), mesh_num)
y_range = np.linspace(np.min(position[:,1]), np.max(position[:,1]), mesh_num)
x_step = x_range[1] - x_range[0]
y_step = y_range[1] - y_range[0]
x_range = np.linspace(np.min(position[:,0]) - x_step,np.max(position[:,0]) + x_step, mesh_num)
y_range = np.linspace(np.min(position[:,1]) - y_step,np.max(position[:,1]) + y_step, mesh_num)
x_step = x_range[1] - x_range[0]
y_step = y_range[1] - y_range[0]
charge_density = mp.Array('d', np.zeros(mesh_num*mesh_num))
pool = mp.Pool(initializer=init_pool_processes, initargs=(
mesh_num,
mean,
cov,
position,
x_range,
y_range,
x_step,
y_step,
x_range,
y_range,
x_step,
y_step,
charge_density
)
)
pool.map(distribute, position)
pool.close()
pool.join()
我还尝试介绍了PEP8——Python代码风格指南中的一些建议。当你有机会的时候,看看它。