多线程和动态数组/矩阵操作



我目前正在编写一个物理模拟(t.b.m.p.求解随机微分方程),我需要并行化它。
现在这可以通过MPI实现,我想我将不得不在未来的某个时候这样做,但目前我想利用我的本地机器的所有8个核心。一个参数设置的正常运行需要2 - 17小时。因此我想利用多线程,特别是下面的函数应该并行执行。这个函数本质上对Nsteps时间步长求解相同的SDE Nrep。计算结果的平均值,并将每个线程存储在Nthreads x Nsteps数组JpmArr的单独一行中。

double **JpmArr;
void worker(const dtype gamma, const dtype dt, const uint seed, const uint Nsteps, const uint Nrep,
            const ESpMatD& Jplus, const ESpMatD& Jminus, const ESpMatD& Jz, const uint tId ){

dtype dW(0), stdDev( sqrt(dt) );
std::normal_distribution<> WienerDistr(0, stdDev);
//create the arrays for the values of <t|J+J-|t>
 dtype* JpmExpect = JpmArr[tId];
//execute Nrep repetitions of the experiment
for (uint r(0); r < Nrep; ++r) {
    //reinitialize the wave function
    psiVecArr[tId] = globalIstate;
    //<t|J+J-|t>
    tmpVecArr[tId] = Jminus* psiVecArr[tId];
    JpmExpect[0] += tmpVecArr[tId].squaredNorm();
    //iterate over the timesteps
    for (uint s(1); s < Nsteps; ++s) {
        //get a random number
        dW = WienerDistr(RNGarr[tId]);
        //execute one step of the RK-s.o. 1 scheme
        tmpPsiVecArr[tId] = F2(gamma, std::ref(Jminus), std::ref(psiVecArr[tId]) );
        tmpVecArr[tId] = psiVecArr[tId] + tmpPsiVecArr[tId] * sqrt(dt);
        psiVecArr[tId] = psiVecArr[tId] + F1(gamma, std::ref(Jminus), std::ref(Jplus),   std::ref(psiVecArr[tId])) * dt + tmpPsiVecArr[tId] * dW 
 + 0.5 * (F2(gamma, std::ref(Jminus), std::ref(tmpVecArr[tId]) ) - F2(gamma, std::ref(Jminus), std::ref(psiVecArr[tId]))) *(dW * dW - dt) / sqrt(dt);
        //normalise
        psiVecArr[tId].normalize();
        //compute <t|J+J-|t>
        tmpVecArr[tId] = Jminus* psiVecArr[tId];
        JpmExpect[s] += tmpVecArr[tId].squaredNorm();
    }
}

//average over the repetitions
for (uint j(0); j < Nsteps; ++j) {
    JpmExpect[j] /= Nrep;
}
}

我使用Eigen作为线性代数的库,因此:

typedef Eigen::SparseMatrix<dtype, Eigen::RowMajor> ESpMatD;
typedef Eigen::Matrix<dtype, Eigen::Dynamic, Eigen::RowMajor> VectorXdrm;

用作类型。上面的辅助函数调用:

VectorXdrm& F1(const dtype a, const ESpMatD& A, const ESpMatD& B, const VectorXdrm& v) {
z.setZero(v.size());
y.setZero(v.size());
// z is for simplification
z = A*v;
//scalar intermediate value c = <v, Av>
dtype c = v.dot(z);
y = a * (2.0 * c * z - B * z - c * c * v);
return y;
}
VectorXdrm& F2(const dtype a, const ESpMatD& A, const VectorXdrm& v) {
//zero the data
z.setZero(v.size());
y.setZero(v.size());
z = A*v;
dtype c = v.dot(z);
y = sqrt(2.0 * a)*(z - c * v);
return y;
}

,其中向量z,y的类型为VectorXdrm,并且在同一个文件(module-global)中声明。所有数组(RNGarr, JpmArr, tmpPsiVecArr, tmpVecArr, psiVecArr)都在main中初始化(使用main.cpp中的extern声明)。设置完成后,我使用std::async运行该函数,等待所有操作完成,然后将JpmArr中的数据收集到main()中的单个数组中,并将其写入文件。

问题:结果是无稽之谈,如果我使用std::launch::async。如果我使用std::launch::deferred,计算和平均结果匹配(在数值方法允许的范围内)我通过解析方法获得的结果。

我再也不知道东西在哪里失败了。我曾经使用Armadillo进行线性代数,但是它的normalize例程提供了nan例程,所以我切换到Eigen,这提示(在文档中)可以用于多线程-它仍然失败。没有工作线程之前,我已经花了4天现在试图得到这个工作和阅读的东西。后者导致我使用全局数组RNGarr, JpmArr, tmpPsiVecArr, tmpVecArr, psiVecArr(在我刚刚尝试在worker中创建适当的数组并通过struct workerResult将结果传递回main之前)以及使用std::ref()将矩阵Jplus, Jminus, Jz传递给函数。(为了简洁,在上面的函数中省略了最后一个)

但是我得到的结果仍然是错误的,我不知道什么是错误的,我应该做什么才能得到正确的结果。对于此类(线程)问题或参考的示例解决方案的任何输入和/或指针将非常感谢。

显然在每个线程的计算之间有一些相互作用——要么是由于你的全局数据被多个线程更新,要么是一些通过引用传递的结构在运行时发生了变化——zy不能是全局的,如果它们被多个线程更新——但可能还有许多其他问题

我建议您按照如下方式重构代码;

    使它面向对象——定义一个自包含的类。取给worker的所有参数,并使它们成为类的成员。
  • 如果您不确定数据结构是否正在发生变化,则不要通过引用传递它们。如果有疑问,假设最坏的情况,并在类内制作完整的副本。
  • 在线程必须更新共享结构的情况下(在你的用例中应该没有),你需要保护互斥锁的读写,以实现独占访问。
  • 删除所有全局的——而不是拥有全局的JpmArr, zy,定义类中需要的数据。
  • 创建worker, F1, F2成员函数

然后在main中,根据需要多次new你新创建的类,并将它们作为一个线程启动——确保你在等待线程完成之前读取任何数据——每个线程将有自己的堆栈,并且在类中有自己的数据,因此并行计算之间的干扰变得不太可能。

如果你想进一步优化,你需要考虑每个矩阵计算作为job并创建一个线程池匹配核的数量,让每个线程按顺序接工作,这将减少上下文切换开销和CPU L1/L2高速缓存错过会发生如果你变得更多的线程数量比你的核心——然而这是成为一个更复杂的比你需要你的紧迫的问题…

停止使用全局变量。无论如何,这是糟糕的风格,这里的多个线程将在同一时间归零和改变zy

最简单的修复方法是将全局变量替换为worker函数中的局部变量——这样每个对worker的并发调用都有自己的副本——并通过引用将它们传递给F1F2

相关内容

  • 没有找到相关文章

最新更新