Boost group_threads最大并行线程数



我想在我的程序中应用boost group_thread的最大线程数。例如

int maxNumberOfThreads
boost::thread_group group;
 for (int i = 0; i < N; ++i)
      //create new if group.size() is smaller then maximal number of threads
      group.create_thread(Worker);
 group.join_all();

有人知道我怎么才能实现这个吗?

因为当我启动N个线程时效率会很低。

谢谢你的帮助

你似乎想要的是一个线程池。

您可以使用boost::thread::hardware_concurrency()来确定特定系统上可用的(逻辑)内核的数量。

这是我上周得到的答案:

#include <boost/thread.hpp>
#include <boost/phoenix.hpp>
#include <boost/optional.hpp>
using namespace boost;
using namespace boost::phoenix::arg_names;
boost::atomic_size_t counter(0ul);
class thread_pool
{
  private:
      mutex mx;
      condition_variable cv;
      typedef function<void()> job_t;
      std::deque<job_t> _queue;
      thread_group pool;
      boost::atomic_bool shutdown;
      static void worker_thread(thread_pool& q)
      {
          while (optional<job_t> job = q.dequeue())
              (*job)();
      }
  public:
      thread_pool() : shutdown(false) {
          for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
              pool.create_thread(bind(worker_thread, ref(*this)));
      }
      void enqueue(job_t job) 
      {
          lock_guard<mutex> lk(mx);
          _queue.push_back(job);
          cv.notify_one();
      }
      optional<job_t> dequeue() 
      {
          unique_lock<mutex> lk(mx);
          namespace phx = boost::phoenix;
          cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue)));
          if (_queue.empty())
              return none;
          job_t job = _queue.front();
          _queue.pop_front();
          return job;
      }
      ~thread_pool()
      {
          shutdown = true;
          {
              lock_guard<mutex> lk(mx);
              cv.notify_all();
          }
          pool.join_all();
      }
};

that的典型用法也在答案中:

static const size_t bignumber = 1 << 20;
class myClass 
{
    thread_pool pool; // uses 1 thread per core
  public:
    void launch_jobs()
    {
        std::cout << "enqueuing jobs... " << std::flush;
        for(size_t i=0; i<bignumber; ++i)
        {
            for(int j=0; j<2; ++j) {
                pool.enqueue(bind(&myClass::myFunction, this, j, i));
            }     
        }
        std::cout << "donen";
    }
  private:
    void myFunction(int i, int j)
    {
        boost::this_thread::sleep_for(boost::chrono::milliseconds(1));
        counter += 1;
    }
};
int main()
{
    myClass instance;
    instance.launch_jobs();
    size_t last = 0;
    while (counter < (2*bignumber))
    {
        boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
        if ((counter >> 4u) > last)
        {
            std::cout << "Progress: " << counter << "/" << (bignumber*2) << "n";
            last = counter >> 4u;
        }
    }
}
对于这个问题,在另一个答案的评论中,我也发布了一个基于无锁作业队列实现的等效解决方案:
  • boost线程抛出异常"thread_resource_error:资源暂时不可用"

这是我的(不完美的)实现:

/**
 * author Christophe Dumeunier
 * brief  Extension of boost::thread_group managing a maximum number of threads running in parallel
 */
class thread_group_max : public boost::thread_group
{
    public:
        /**
         * brief  Instanciate a group for threads
         * param  max_running_threads  Maximum number of threads running in parallel, if 0 use the number of cores
         * param    max_sleeping_time  Maximum sleeping time (seconds) between two checks for finished threads (must be > sleeping_time_start)
         * param   sleeping_time_grow  Coefficient increasing sleeping time while waiting for finished threads (must be > 1)
         * param  sleeping_time_start  Initial sleeping time (must be > 0)
         */
        explicit                   thread_group_max(std::size_t max_running_threads = 0, float max_sleeping_time = 1.0f,
                                                    float sleeping_time_grow = 1.1f, float sleeping_time_start = 0.001f);
        /**
         * brief  Destroy the group
         * note   Doesn't join the unterminated threads
         */
                                   ~thread_group_max();
        /** brief Wait for an available slot and then create a new thread and launch it */
        template<typename F>
        boost::thread*             create_thread(F f);
    private:
        std::size_t                maxRunningThreads;  //!< Maximum number of running threads
        float                      maxSleepingTime;    //!< Maximum sleeping time between two checks for finished threads
        float                      sleepingTimeStart;  //!< Initial sleeping time
        float                      sleepingTimeGrow;   //!< Coefficient increasing sleeping time while waiting for finished threads
        std::set<boost::thread*>   runningThreads;     //!< Pointers to running or finished-but-not-removed-yet threads
};
thread_group_max::thread_group_max(std::size_t max_running_threads, float max_sleeping_time, float sleeping_time_grow, float sleeping_time_start) :
    boost::thread_group(),
    maxRunningThreads(max_running_threads == 0 ? std::max(boost::thread::hardware_concurrency(), 1u) : max_running_threads),
    maxSleepingTime(max_sleeping_time),
    sleepingTimeStart(sleeping_time_start),
    sleepingTimeGrow(sleeping_time_grow),
    runningThreads()
{
    assert(this->maxRunningThreads > 0);
    assert(this->maxSleepingTime >= this->sleepingTimeStart);
    assert(this->sleepingTimeStart > 0.0f);
    assert(this->sleepingTimeGrow > 1.0f);
}
thread_group_max::~thread_group_max()
{}
template<typename F>
boost::thread* thread_group_max::create_thread(F f)
{
    // First, try to clean already finished threads
    if(this->runningThreads.size() >= this->maxRunningThreads)
    {
        for(std::set<boost::thread*>::iterator it = this->runningThreads.begin(); it != this->runningThreads.end();)
        {
            const std::set<boost::thread*>::iterator jt = it++;
            if((*jt)->timed_join(boost::posix_time::milliseconds(0))) /// @todo timed_join is deprecated
                this->runningThreads.erase(jt);
        }
    }
    // If no finished thread found, wait for it
    if(this->runningThreads.size() >= this->maxRunningThreads)
    {
        float sleeping_time = this->sleepingTimeStart;
        do
        {
            boost::this_thread::sleep(boost::posix_time::milliseconds((long int)(1000.0f * sleeping_time)));
            for(std::set<boost::thread*>::iterator it = this->runningThreads.begin(); it != this->runningThreads.end();)
            {
                const std::set<boost::thread*>::iterator jt = it++;
                if((*jt)->timed_join(boost::posix_time::milliseconds(0))) /// @todo timed_join is deprecated
                    this->runningThreads.erase(jt);
            }
            if(sleeping_time < this->maxSleepingTime)
            {
                sleeping_time *= this->sleepingTimeGrow;
                if(sleeping_time > this->maxSleepingTime)
                    sleeping_time = this->maxSleepingTime;
            }
        } while(this->runningThreads.size() >= this->maxRunningThreads);
    }
    // Now, at least 1 slot is available, use it
    return *this->runningThreads.insert(this->boost::thread_group::create_thread(f)).first;
}

使用示例:

thread_group_max group(num_threads);
for(std::size_t i = 0; i < jobs.size(); ++i)
  group.create_thread(boost::bind(&my_run_job_function, boost::ref(job[i])));
group.join_all();

最新更新