将元素从C 11线程安全地添加到向量中



我的程序需要生成大量样品字符串,并且由于生成字符串在计算上是密集的,所以我想并行化该过程。我的代码是这样:

mutex mtx;
void my_thread(vector<string> &V, int length)
{
     string s=generate_some_string(length);  //computationally intensive part
      mtx.lock();
       V.push_back(s);
      mtx.unlock();

}
int main()
{
   vector<string> S;
   while(S.size()<1000)
  {
    vector<thread> ths;
    ths.resize(10);
    for(int i=0; i<10;i++)
    {
       ths[i]=thread(my_thread,ref(S),100 );
    }
    for(auto &th: ths)  th.join();

  }  

}

我运行时会出现"双重或损坏"错误。

您的代码

您对线程的使用通常看起来很正确,因此问题可能是generate_some_string影响全局状态。您可以通过以下方法解决此问题:

  • 使用更好的库。

  • 使用MPI进行并行性,因为它将产生具有单独记忆的过程。

平行哲学

回顾过去似乎很明显,因此存在一个问题,即为什么它并不明显。我认为这与您实现并行性的方式有关。

C 11线程为您提供了很大的灵活性,但它也要求您明确构建并行性。大多数时候,这是不是您想要的。提供有关如何平行代码并让其负责低级详细信息的编译器信息更容易,更小的货物。

以下显示了如何使用OpenMP进行此操作:所有现代编译器中包含并广泛用于高性能计算中的行业标准编译器领域集。

您会注意到,代码通常比您编写的内容更容易阅读,从而更容易调试。

下面的所有代码将使用命令编译(适当地为您的编译器进行了修改:

g++ -O3 main.cpp -fopenmp

解决方案0:使用平行的更轻松的形式

首先,我建议将OpenMP用于您的并行性。这是一种行业标准,消除了必须处理线程的许多痛苦,并允许您在概念层面上表达并行性。

解决方案1:私有内存

您可以通过将每个线程写入自己的私人记忆,然后将私人记忆合并在一起来解决问题。这完全避免了静音,这可能会导致更快的代码,并可能避免您完全遇到的问题。

请注意,每个线程都会产生多个计算密集的字符串,但是这项工作会自动划分在可用线程之间。是

#include <vector>
#include <string>
#include <omp.h>
#include <cmath>
#include <thread>
#include <chrono>
#include <iostream>
const int STRINGS_PER_LENGTH = 10;
const int MAX_STRING_LENGTH  = 50;
using namespace std::chrono_literals;
//Computationally intensive string generation. Note that this function
//CANNOT have a global state, or the threads will maul it.
std::string GenerateSomeString(int length){
  double sum=0;
  for(int i=0;i<length;i++){
    std::this_thread::sleep_for(2ms);
    sum+=std::sqrt(i);
  }
  return std::to_string(sum);
}
int main(){
  //Build a vector that contains vectors of strings. Each thread will have its
  //own vector of strings
  std::vector< std::vector<std::string> > vecs(omp_get_max_threads());
  //Loop over lengths
  for(int length=10;length<MAX_STRING_LENGTH;length++){
    //Progress so the user does not get impatient
    std::cout<<length<<std::endl;
    //Parallelize across all cores
    #pragma omp parallel for
    for(int i=0;i<STRINGS_PER_LENGTH;i++){
      //Each thread independently generates its string and puts it into its own
      //private memory space
      vecs[omp_get_thread_num()].push_back(GenerateSomeString(length));
    }
  }
  //Merge all the threads' results together
  std::vector<std::string> S;
  for(auto &v: vecs)
    S.insert(S.end(),v.begin(),v.end());
  //Throw away the thread private memory
  vecs.clear();
  vecs.shrink_to_fit();
}

解决方案2:使用减少

我们可以将自定义还原操作员定义为合并向量。在代码的并行部分中使用此操作员,使我们能够消除向量的向量和后来的清理。相反,当线程完成工作时,OpenMP安全地处理了结合结果。

#include <vector>
#include <string>
#include <omp.h>
#include <cmath>
#include <thread>
#include <chrono>
#include <iostream>
using namespace std::chrono_literals;
const int STRINGS_PER_LENGTH = 10;
const int MAX_STRING_LENGTH  = 50;    
//Computationally intensive string generation. Note that this function
//CANNOT have a global state, or the threads will maul it.
std::string GenerateSomeString(int length){
  double sum=0;
  for(int i=0;i<length;i++){
    std::this_thread::sleep_for(2ms);
    sum+=std::sqrt(i);
  }
  return std::to_string(sum);
}
int main(){
  //Global vector, must not be accessed by individual threads
  std::vector<std::string> S;
  #pragma omp declare reduction (merge : std::vector<std::string> : omp_out.insert(omp_out.end(), omp_in.begin(), omp_in.end()))
  //Loop over lengths
  for(int length=10;length<50;length++){
    //Progress so the user does not get impatient
    std::cout<<length<<std::endl;
    //Parallelize across all cores
    std::vector<std::string> private_memory;
    #pragma omp parallel for reduction(merge: private_memory)
    for(int i=0;i<STRINGS_PER_LENGTH;i++){
      //Each thread independently generates its string and puts it into its own
      //private memory space
      private_memory.push_back(GenerateSomeString(length));
    }
  }
}

解决方案3:使用critical

我们可以通过将push_back放入关键部分来完全消除减排,该部分将对该代码的该部分的访问限制为一次线程。

//Compile with g++ -O3 main.cpp -fopenmp
#include <vector>
#include <string>
#include <omp.h>
#include <cmath>
#include <thread>
#include <chrono>
#include <iostream>
using namespace std::chrono_literals;
const int STRINGS_PER_LENGTH = 10;
const int MAX_STRING_LENGTH  = 50;    
//Computationally intensive string generation. Note that this function
//CANNOT have a global state, or the threads will maul it.
std::string GenerateSomeString(int length){
  double sum=0;
  for(int i=0;i<length;i++){
    std::this_thread::sleep_for(2ms);
    sum+=std::sqrt(i);
  }
  return std::to_string(sum);
}
int main(){
  //Global vector, must not be accessed by individual threads
  std::vector<std::string> S;
  //Loop over lengths
  for(int length=10;length<50;length++){
    //Progress so the user does not get impatient
    std::cout<<length<<std::endl;
    //Parallelize across all cores
    #pragma omp parallel for
    for(int i=0;i<STRINGS_PER_LENGTH;i++){
      //Each thread independently generates its string and puts it into its own
      //private memory space
      const auto temp = GenerateSomeString(length);
      //Only one thread can access this part of the code at a time
      #pragma omp critical
      S.push_back(temp);
    }
  }
}

最新更新