休眠线程池:工作线程在没有来自主线程的 notify() 的情况下唤醒



我正在实现一个线程池,当没有准备好工作时,工作人员会睡觉,而主线程在工作人员忙碌时会睡觉。 我注意到工作线程在调用wait()后仍在继续工作,即使主线程没有notify_all()

输出如下所示:

WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
......

工作器功能:

void TaskSystemParallelThreadPoolSleeping::waitFunc() {
std::unique_lock<std::mutex> lock(*this->mutex_);
while(true) {
this->num_wait++;
std::cout << "WORKER WAIT" << std::endl;
this->cond_->wait(lock,
std::bind(&TaskSystemParallelThreadPoolSleeping::wakeWorker, this));
std::cout << "WORKER AWAKENS!" << std::endl;
if (this->done_flag == true) {
this->mutex_->unlock();
break;
}
this->mutex_->unlock();
std::cout << "WORKER START" << std::endl;
while (true) {
this->mutex_->lock();
if (this->not_done == 0) {  // ALL work done
if (this->total_work != 0) {  // 1st time seen by workers
this->total_work = 0;
this->num_wait = 0;
std::cout << "WORKER WAKE MAIN" << std::endl;
this->mutex_->unlock();
this->cond_->notify_all();
}
this->mutex_->unlock();
break;
}
int total = this->total_work;
int id = this->work_counter;
if (id == total) {  // NO work initiated or NO work left
this->mutex_->unlock();
continue;
}
++(this->work_counter);  // increment counter
this->mutex_->unlock();  // Let others access counters to work
this->runnable->runTask(id, total); // do work
this->mutex_->lock();
--(this->not_done); // decrement counter after work done
this->mutex_->unlock();
}
std::cout << "WORKER DONE" << std::endl;
}
std::cout << "WORKER TERMINATE" << std::endl;
}

主线程:

void TaskSystemParallelThreadPoolSleeping::run(IRunnable* runnable, int num_total_tasks) {
//
// TODO: CS149 students will modify the implementation of this
// method in Part A.  The implementation provided below runs all
// tasks sequentially on the calling thread.
// Set-up work
this->mutex_->lock();
std::cout << "MAIN SETUP" << std::endl;
this->runnable = runnable;
this->work_counter = 0;
this->not_done = num_total_tasks;
this->total_work = num_total_tasks;
// Tell workers there is work
std::cout << "MAIN POLLS READINESS" << std::endl;
while (this->num_wait < this->num_T) {  // Check if all ready
this->mutex_->unlock();
this->mutex_->lock();
}
std::cout << "ALL WORKERS READY" << std::endl;
this->mutex_->unlock();
this->cond_->notify_all();
// Wait for workers to complete work
std::unique_lock<std::mutex> lock(*this->mutex_);
this->cond_->wait(lock,
std::bind(&TaskSystemParallelThreadPoolSleeping::wakeMain, this));
std::cout << "MAIN END" << std::endl;
}

唤醒工人的条件:

bool TaskSystemParallelThreadPoolSleeping::wakeWorker() {
return (this->done_flag == true ||
(this->total_work != 0 && this->num_wait == this->num_T));
}

唤醒主线程的条件:

bool TaskSystemParallelThreadPoolSleeping::wakeMain() {
return this->total_work == 0;
}

线程池构造函数:

TaskSystemParallelThreadPoolSleeping::TaskSystemParallelThreadPoolSleeping(int num_threads): ITaskSystem(num_threads) {
//
// TODO: CS149 student implementations may decide to perform setup
// operations (such as thread pool construction) here.
// Implementations are free to add new class member variables
// (requiring changes to tasksys.h).
//
this->num_T = std::max(1, num_threads - 1);
this->threads = new std::thread[this->num_T];
this->mutex_ = new std::mutex();
this->cond_ = new std::condition_variable();
this->total_work = 0;
this->not_done = 0;
this->work_counter = 0;
this->num_wait = 0;
this->done_flag = {false};
for (int i = 0; i < this->num_T; i++) {
this->threads[i] = std::thread(&TaskSystemParallelThreadPoolSleeping::waitFunc, this);
}
}

线程池析构函数:

TaskSystemParallelThreadPoolSleeping::~TaskSystemParallelThreadPoolSleeping() {
this->done_flag = true;
this->cond_->notify_all();
for (int i = 0; i < this->num_T; i++) {
this->threads[i].join();
}
delete this->mutex_;
delete[] this->threads;
delete this->cond_;
}

我认为开头应该是:

WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
......

即工人只有在主notify_all()后才能醒来

编辑: 这是完整的日志。似乎工人的这种自我觉醒后来导致了僵局,其中一个工人自己觉醒并完成了所有工作,设置this->num_wait=0this->total_work=0。因此,所有线程只能看到this->num_wait=1

WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
MAIN ENDWORKER DONE
WORKER WAIT
WORKER DONE
WORKER WAIT
MAIN SETUP
MAIN POLLS READINESS
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER AWAKENS!
WORKER START
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
WORKER DONEMAIN END
MAIN SETUP
MAIN POLLS READINESS
WORKER DONE
WORKER WAIT
WORKER WAIT
WORKER AWAKENS!
WORKER START
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONEWORKER DONE
WORKER WAIT
WORKER WAIT
WORKER DONE
WORKER WAIT
MAIN END
MAIN SETUP
MAIN POLLS READINESS
ALL WORKERS READY
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
WORKER DONEWORKER DONE
WORKER WAIT
MAIN END
MAIN SETUP
MAIN POLLS READINESS
WORKER WAIT
WORKER AWAKENS!
WORKER START
WORKER WAKE MAIN
WORKER DONE
WORKER WAIT
ALL WORKERS READY
MAIN END
MAIN SETUP
MAIN POLLS READINESS

"工作线程在没有主线程通知的情况下唤醒"的原因不言而喻:this->num_wait 和 this->cond_->wait 的增量在同一个块内,而不放弃锁/通知主线程,如果条件变量唤醒条件为真。轮询中的最后一个线程直接传递 wakeWorker() 中定义的条件,因此您的观察。

(我希望这段代码只是一些玩具代码——它有太多问题...... 鉴于手动互斥锁/解锁,如果它陷入僵局,我并不感到惊讶......

this->num_wait++;
std::cout << "WORKER WAIT" << std::endl;
this->cond_->wait(lock,
std::bind(&TaskSystemParallelThreadPoolSleeping::wakeWorker, this));
std::cout << "WORKER AWAKENS!" << std::endl;

根据 https://en.cppreference.com/w/cpp/thread/condition_variable/wait,condition_variable::wait() 等价于

while (!pred()) {
wait(lock);
}

因此,如果 pred() 返回 true,则执行不会放弃锁定

最新更新