我不知道为什么这个 MPI 代码中的 while 循环没有中断



我正在使用mpi4py进行并行化练习,其中投掷2个骰子的次数是定义的(除以进程,即npp),并计算点数。结果存储在字典中,计算平均偏差,直到条件为当mean_dev小于0.001时,投掷次数翻倍。

所有这些都如预期的那样工作,问题是代码没有退出。条件满足,不再输出,但代码挂起。

from ctypes.wintypes import SIZE
from dice import * #This is just a class that creates the dictionaries 
from random import randint
import matplotlib.pyplot as plt
import numpy as np
from mpi4py import MPI
from math import sqrt
def simulation(f_events, f_sides, f_n_dice):
f_X = dice(sides, n_dice).myDice() #Nested dictionary composed of dices (last dict stores the sum)
for j in range(f_events): #for loop to handle all the dice throwings aka events
n = [] #List to store index respective to number on each dice
for i in range(1, f_n_dice+1): #for cycle for each dice
k = randint(1, f_sides) #Random number
n.append(k)
f_X[i][k] += 1 #The index (k) related to each throw is increased for the dice (i)
sum_throw = sum(n) #Sum of the last throw
f_X[f_n_dice+1][sum_throw] += 1 #Sum dictionary "increases" the index respective to the sum of the last throw
return f_X
npp = int(4)//4 #Number of events divided by the number of processes
sides = 6 #Number of sides per dice
n_dice = 2 #Number of dices
comm = MPI.COMM_WORLD #Communicator to handle point-to-point communication
rank = comm.Get_rank() #Hierarchy of processes
size = comm.Get_size() #Number of processes
#-------------------- Parallelization portion of the code --------------------#
seq = (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
AUX = dict.fromkeys(seq, 0)
mean_dev = 1
while True:
msg = comm.bcast(npp, root = 0)
print("---> msg: ", msg, " for rank ", rank)
print("The mean dev for %d" %rank + " is: ", mean_dev)
D = simulation(npp, sides, n_dice)

Dp = comm.gather(D, root = 0)
print("This is Dp: ", Dp)

summ = 0
prob = [1/36, 2/36, 3/36, 4/36, 5/36, 6/36, 5/36, 4/36, 3/36, 2/36, 1/36]
if rank==0:
for p in range(0, size): 
for n in range(dice().min, dice().max+1): #Range from minimum sum possible to the maximum sum possible depending on the number of dices used
AUX[n] += Dp[p][n_dice+1][n] #Adds the new data to the final sum dictionary 
#of the previously initiated nested dictionary
print(Dp[p][n_dice+1])

print("The final dictionary is: ", AUX, sum(AUX[j] for j in AUX))
for i in range(dice().min, dice().max+1):
exp = (prob[i-2])*(sum(AUX[j] for j in AUX))
x = (AUX[i]-exp)/exp
summ = summ + pow(x, 2)
mean_dev = (1/11)*sqrt(summ)
print("The deviation for {} is {}.".format(sum(AUX[j] for j in AUX), mean_dev))
if mean_dev > 0.001:
npp = 2*npp
# new_msg = comm.bcast(npp, root = 0)
# print("---> new_msg: ", new_msg, " for rank ", rank)
else:
break

我被这个难住了。提前感谢任何输入!


新代码的解决方案由@victor-eijkhout:

from ctypes.wintypes import SIZE
from dice import *
from random import randint
import matplotlib.pyplot as plt
import numpy as np
from mpi4py import MPI
from math import sqrt
def simulation(f_events, f_sides, f_n_dice):
f_X = dice(sides, n_dice).myDice() #Nested dictionary composed of dices (last dict stores the sum)
for j in range(f_events): #for loop to handle all the dice throwings aka events
n = [] #List to store index respective to number on each dice
for i in range(1, f_n_dice+1): #for cycle for each dice
k = randint(1, f_sides) #Random number
n.append(k)
f_X[i][k] += 1 #The index (k) related to each throw is increased for the dice (i)
sum_throw = sum(n) #Sum of the last throw
f_X[f_n_dice+1][sum_throw] += 1 #Sum dictionary "increases" the index respective to the sum of the last throw
return f_X
npp = int(4)//4 #Number of events divided by the number of processes
sides = 6 #Number of sides per dice
n_dice = 2 #Number of dices
comm = MPI.COMM_WORLD #Communicator to handle point-to-point communication
rank = comm.Get_rank() #Hierarchy of processes
size = comm.Get_size() #Number of processes
#-------------------- Parallelization portion of the code --------------------#
seq = (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
AUX = dict.fromkeys(seq, 0)
mean_dev = 1
while True:
msg = comm.bcast(npp, root = 0)
#print("---> msg: ", msg, " for rank ", rank)

D = simulation(npp, sides, n_dice)

Dp = comm.gather(D, root = 0)
#if Dp != None: print("This is Dp: ", Dp)

#print("The mean dev for %d" %rank + " is: ", mean_dev)
if rank==0:

summ = 0
prob = [1/36, 2/36, 3/36, 4/36, 5/36, 6/36, 5/36, 4/36, 3/36, 2/36, 1/36]
for p in range(0, size): 
for n in range(dice().min, dice().max+1): #Range from minimum sum possible to the maximum sum possible depending on the number of dices used
AUX[n] += Dp[p][n_dice+1][n] #Adds the new data to the final sum dictionary 
#of the previously initiated nested dictionary
print(Dp[p][n_dice+1])

print("The final dictionary is: ", AUX, sum(AUX[j] for j in AUX))
for i in range(dice().min, dice().max+1):
exp = (prob[i-2])*(sum(AUX[j] for j in AUX))
x = (AUX[i]-exp)/exp
summ = summ + pow(x, 2)
mean_dev = (1/11)*sqrt(summ)
print("The deviation for {} is {}.".format(sum(AUX[j] for j in AUX), mean_dev))
#new_mean_dev = comm.gather(mean_dev, root = 0)
new_mean_dev = comm.bcast(mean_dev, root = 0)
print("---> msg2: ", new_mean_dev, " for rank ", rank)
if new_mean_dev < 0.001:
break
# new_msg = comm.bcast(npp, root = 0)
# print("---> new_msg: ", new_msg, " for rank ", rank)

else:
npp = 2*npp
print("The new npp is: ", npp)

Git repo与其他并行化问题和解决方案:https://github.com/davidmcarreira/parallel-computing

您只计算过程0的平均偏差,因此该过程将退出。但是,其他进程不会获得该值,因此它们永远不会退出。你应该在计算完之后广播这个值。

您正在跳出if语句。只要把True:换成while mean_dev > 0.001:就可以了。您也可以在最后进行赋值,而不是将其包装在if中。

如果这不起作用,这仅仅意味着mean_dev总是大于0.001。将mean_dev计算为(1/11)*sqrt(sum …)。不是在整个算法下,如果2个骰子的最小和是2,那么mean_dev不会降到0.14左右。尝试在每次循环中放入一个print语句并打印出mean_dev,看看它是否如预期的那样工作。你应该每次都用mean_dev除以npp或者类似的东西吗?

作为一般规则,当估计的变化变得非常小时,人们迭代寻找更接近的近似值的这类问题通常会终止。当mean_dev的变化小于0.001时,应该停止吗?您需要执行类似abs(last_mean_dev-mean_dev)<0.001 .

最新更新