所以,我正在做这个项目,我正在用教堂计算语言写作。我已经编写了该程序,并且在非并行运行时可以完美运行。
但是当我添加需要并行化的forall
语句时,该程序确实运行得更快,但它没有提供我需要的结果。我知道这是因为当我这样做时,我在步骤 1、3、5 和 7 中有一个竞争条件j = j - 1;
所以我尝试j
一个同步变量以防止这个竞争条件破坏我的结果,然后我编译并运行,我的程序永远不会退出步骤 1, 这是第一个同步变量所在的地方,所以我有理由相信这是因为同步的变量,j
.
如果有人对我应该如何并行化或同步以便对最终网格进行排序有任何见解,那就太好了。代码如下:
//SlideSort.chapel_version
//
use Random;
use Time;
config const seed = 2345;
var rs = new RandomStream(seed);
var num: int;
var transferMesh:[0..36530] int;
var copyMesh:[0..36530] int;
var mesh:[0..37883] int;
var i: int;
var z: int;
proc slideSort(){
writeln("nSorting..n");
/*-------------------------Step 1-------------------------------*/
writeln("Step 1");
forall i in 0..36530{
transferMesh[i] = mesh[i+677];
}//end for
forall i in 0..1352{
forall a in 0..26{
var value: int = transferMesh[1353*a+i];
var j$: int = 1353*a+i-1;
while j$>= 1353*a && transferMesh[j$] > value{
transferMesh[j$+1] = transferMesh[j$];
j$ = j$ - 1;
}//end While
transferMesh[j$+1] = value;
}//end a_for
}//end i_for
forall k in 0..36530{
mesh[k+677] = transferMesh[k];
}//end For_k
/*--------------------------Step 2--------------------------------*/
writeln("Step 2");
forall k in 677..37207{
transferMesh[k-677] = mesh[k];
}//end k_for
forall k in 0..1352{
for z in 0..26{
copyMesh[k+1353*z] = transferMesh[27*k+z];
}//end z_for
}//end k_for
forall k in 0..36530{
mesh[k+677] = copyMesh[k];
}//end k_for
/*--------------------------Step 3--------------------------------*/
writeln("Step 3");
forall i in 0..36530{
transferMesh[i] = mesh[i+677];
}//end for
forall i in 0..1352{
forall a in 0..26{
var value: int = transferMesh[1353*a+i];
var j: int = 1353*a+i-1;
while j>= 1353*a && transferMesh[j] > value{
transferMesh[j+1] = transferMesh[j];
j = j - 1;
}//end While
transferMesh[j+1] = value;
}//end a_for
}//end i_for
forall k in 0..36530{
mesh[k+677] = transferMesh[k];
}//end For_k
/*--------------------------Step 4--------------------------------*/
writeln("Step 4");
forall k in 677..37207{
transferMesh[k-677] = mesh[k];
}//end for
forall k in 0..1352{
forall z in 0..26{
copyMesh[27*k+z] = transferMesh[k+1353*z];
}//end z_for
}//end k_for
forall k in 0..36530{
mesh[k+677] = copyMesh[k];
}//end k_for
/*--------------------------Step 5--------------------------------*/
writeln("Step 5");
forall i in 0..36530{
transferMesh[i] = mesh[i+677];
}//end for
forall i in 0..1352{
forall a in 0..26{
var value: int = transferMesh[1353*a+i];
var j: int = 1353*a+i-1;
while j>= 1353*a && transferMesh[j] > value{
transferMesh[j+1] = transferMesh[j];
j = j - 1;
}//end While
transferMesh[j+1] = value;
}//end a_for
}//end i_for
forall k in 0..36530{
mesh[k+677] = transferMesh[k];
}//end For_k
/*--------------------------Step 6--------------------------------*/
writeln("Step 6");
/*This is the padding step, so, since I already have a padded array
I will simply just sort my padded array in step 7
*/
/*--------------------------Step 7--------------------------------*/
writeln("Step 7n");
forall k in 0..1352{
forall a in 0..27{
var value: int = mesh[1353*a+k];
var j: int = 1353*a+k-1;
while j >= 1353 *a && mesh[j] > value{
mesh[j+1] = mesh [j];
j = j - 1;
}//end while
mesh[j+1] = value;
}//end a_for
}//end k_for
}//slideSort END
/*^^^^^^^^^^^^^^^^^^^^^^^end slidesort^^^^^^^^^^^^^^^^^^^^^^^^^^^^*/
proc isSorted() :int{
var sorted: int = 0;
for i in 0..37882{
if mesh[i+1] < mesh[i]{
writeln("NOT SORTED :(");
sorted = 1;
break;
}//if
}//end For
return sorted;
}// isSorted END
proc main(){
/*Padding array with -INF*/
for i in 0..676 do mesh[i] = -1000000;
/*Filling array with random ints*/
for i in 677..37207{
num = (301 * rs.getNext()): int;
mesh[i] = num;
}//end for
/*Padding array with +INF*/
for i in 37208..37883 do mesh[i] = 1000000;
writeln("n200th Element: ", mesh[199],"n1000th Element: ", mesh[999]);
writeln("30000th Element: ", mesh[29999], "n37300th Element: ", mesh[37299]);
const startTime = getCurrentTime();
slideSort();
const endTime = getCurrentTime();
writeln("n200th Element: ", mesh[199], "n1000th Element: ", mesh[999]);
writeln("30000th Element: ", mesh[29999], "n37300th Element: ", mesh[37299]);
writeln("nnElapsed time was: ", (endTime - startTime));
if(isSorted()==0) then writeln("nYep the mesh is sorted via SlideSort :)");
write("nWould you like to print out every 100th element of the Mesh?n",
"'Y' for yesn",
"'N' for non");
var print: string = "N";
print = stdin.read(string);
if print == "Y" || print == "y"{
for i in 0..37883 by 100{
write(mesh[i],"n");
}//end innerFor
}//end if
}//end MAIN
它与同步变量没有太大关系。您的双循环如下所示:
forall i in 0..1352 do
forall a in 0..26
{
var j = 1353*a+i;
var v = transferMesh[j];
while( j>= 1353*a )
{
if( transferMesh[j] <= v )
break;
transferMesh[j+1] = transferMesh[j];
j--;
}
transferMesh[j+1] = v;
}
这是数据竞赛的一个明显来源。您测试 transferMesh[j]
,但由于另一个具有相同 a 的 i 可以同时迭代,因此它可以同时修改该值,从而导致未定义的结果。
对于这些循环中的每一个,每个块应该只允许一个工作线程(因此每个值为 a)。因此,您应该只在 a 上并行迭代,即 forall a in 0..26 do for i in 0..1352 {...}
因此,很容易获得更正后的代码:
//SlideSort.chapel_version
//
use Random;
use Time;
config const seed = 2345;
var rs = new RandomStream(seed);
var num: int;
var transferMesh:[0..36530] int;
var copyMesh:[0..36530] int;
var mesh:[0..37883] int;
var i: int;
var z: int;
proc slideSort()
{
writeln("nSorting..n");
/*-------------------------Step 1-------------------------------*/
writeln("Step 1");
forall i in 0..36530
{
transferMesh[i] = mesh[i+677];
}//end for
forall a in 0..26
{
for i in 0..1352
{
var value: int = transferMesh[1353*a+i];
var j: int = 1353*a+i-1;
while j>= 1353*a && transferMesh[j] > value
{
transferMesh[j+1] = transferMesh[j];
j = j - 1;
}//end While
transferMesh[j+1] = value;
}//end i_for
}//end a_for
forall k in 0..36530
{
mesh[k+677] = transferMesh[k];
}//end For_k
/*--------------------------Step 2--------------------------------*/
writeln("Step 2");
forall k in 677..37207
{
transferMesh[k-677] = mesh[k];
}//end k_for
forall z in 0..26
{
forall k in 0..1352
{
copyMesh[k+1353*z] = transferMesh[27*k+z];
}//end k_for
}//end z_for
forall k in 0..36530
{
mesh[k+677] = copyMesh[k];
}//end k_for
/*--------------------------Step 3--------------------------------*/
writeln("Step 3");
forall i in 0..36530
{
transferMesh[i] = mesh[i+677];
}//end for
forall a in 0..26
{
for i in 0..1352
{
var value: int = transferMesh[1353*a+i];
var j: int = 1353*a+i-1;
while j>= 1353*a && transferMesh[j] > value
{
transferMesh[j+1] = transferMesh[j];
j = j - 1;
}//end While
transferMesh[j+1] = value;
}//end i_for
}//end a_for
forall k in 0..36530
{
mesh[k+677] = transferMesh[k];
}//end For_k
/*--------------------------Step 4--------------------------------*/
writeln("Step 4");
forall k in 677..37207
{
transferMesh[k-677] = mesh[k];
}//end for
forall k in 0..1352
{
for z in 0..26
{
copyMesh[27*k+z] = transferMesh[k+1353*z];
}//end z_for
}//end k_for
forall k in 0..36530
{
mesh[k+677] = copyMesh[k];
}//end k_for
/*--------------------------Step 5--------------------------------*/
writeln("Step 5");
forall i in 0..36530
{
transferMesh[i] = mesh[i+677];
}//end for
forall a in 0..26
{
for i in 0..1352
{
var value: int = transferMesh[1353*a+i];
var j: int = 1353*a+i-1;
while j>= 1353*a && transferMesh[j] > value
{
transferMesh[j+1] = transferMesh[j];
j = j - 1;
}//end While
transferMesh[j+1] = value;
}//end i_for
}//end a_for
forall k in 0..36530
{
mesh[k+677] = transferMesh[k];
}//end For_k
/*--------------------------Step 6--------------------------------*/
writeln("Step 6");
/*This is the padding step, so, since I already have a padded array
I will simply just sort my padded array in step 7
*/
/*--------------------------Step 7--------------------------------*/
writeln("Step 7n");
forall a in 0..27
{
for k in 0..1352
{
var value: int = mesh[1353*a+k];
var j: int = 1353*a+k-1;
while j >= 1353 *a && mesh[j] > value
{
mesh[j+1] = mesh [j];
j = j - 1;
}//end while
mesh[j+1] = value;
}//end k_for
}//end a_for
}//slideSort END
/*^^^^^^^^^^^^^^^^^^^^^^^end slidesort^^^^^^^^^^^^^^^^^^^^^^^^^^^^*/
proc isSorted() :int
{
var sorted: int = 0;
for i in 0..37882
{
if mesh[i+1] < mesh[i]
{
writeln("NOT SORTED :(");
sorted = 1;
break;
}//if
}//end For
return sorted;
}// isSorted END
proc main()
{
/*Padding array with -INF*/
for i in 0..676 do mesh[i] = -1000000;
/*Filling array with random ints*/
for i in 677..37207
{
num = (301 * rs.getNext()): int;
mesh[i] = num;
}//end for
/*Padding array with +INF*/
for i in 37208..37883 do mesh[i] = 1000000;
writeln("n200th Element: ", mesh[199],"n1000th Element: ", mesh[999]);
writeln("30000th Element: ", mesh[29999], "n37300th Element: ", mesh[37299]);
const startTime = getCurrentTime();
slideSort();
const endTime = getCurrentTime();
writeln("n200th Element: ", mesh[199], "n1000th Element: ", mesh[999]);
writeln("30000th Element: ", mesh[29999], "n37300th Element: ", mesh[37299]);
writeln("nnElapsed time was: ", (endTime - startTime));
if(isSorted()==0) then writeln("nYep the mesh is sorted via SlideSort :)");
}//end MAIN
结果:
$ chpl sort.chpl ; ./a.out | tail -n 1
Yep the mesh is sorted via SlideSort :)