如何在工作线程完成时安全地终止它们?



我试图使用c++ 11同步特性实现一个主工作模型作为实践。该模型使用一个std::queue对象以及一个条件变量和一些互斥锁。主线程将任务放入队列,工作线程将任务从队列中弹出并"处理"它们。

当我不终止工作线程时,我的代码可以正常工作(除非我错过了一些竞争条件)。然而,程序永远不会结束,直到你用Ctrl+C手动终止它。我有一些代码终止工人后,主线程完成。不幸的是,这不能正常工作,因为它会在某些执行运行中跳过最后一个任务。

我的问题是:是否有可能在处理完所有任务后安全正确地终止工作线程?

这只是一个概念的证明,我不熟悉c++ 11的特性,所以我为我的风格道歉。我感谢任何建设性的批评。

EDIT: nogard友善地指出,这个模型的实现使它变得相当复杂,并告诉我,我所要求的是毫无意义的,因为一个好的实现不会有这个问题。线程池是正确实现这一点的一种方式。此外,我应该使用一个std::原子而不是一个正常的布尔值为worker_done(感谢Jarod42)。

#include <iostream>
#include <sstream>
#include <string>
#include <thread>
#include <mutex>
#include <queue>
#include <condition_variable>
//To sleep
#include <unistd.h>
struct Task
{
    int taskID;
};
typedef struct Task task;

//cout mutex
std::mutex printstream_accessor;
//queue related objects
std::queue<task> taskList;
std::mutex queue_accessor;
std::condition_variable cv;
//worker flag
bool worker_done = false;
//It is acceptable to call this on a lock only if you poll - you will get an inaccurate answer otherwise
//Will return true if the queue is empty, false if not
bool task_delegation_eligible()
{
    return taskList.empty();
}
//Thread safe cout function
void safe_cout(std::string input)
{
    // Apply a stream lock and state the calling thread information then print the input
    std::unique_lock<std::mutex> cout_lock(printstream_accessor);   
    std::cout << "Thread:" << std::this_thread::get_id() << " " << input << std::endl;
}//cout_lock destroyed, therefore printstream_accessor mutex is unlocked
void worker_thread()
{
    safe_cout("worker_thread() initialized");
    while (!worker_done)
    {
        task getTask;
        {
            std::unique_lock<std::mutex> q_lock(queue_accessor);
            cv.wait(q_lock, 
            []
                {   //predicate that will check if available
                    //using a lambda function to apply the ! operator
                    if (worker_done)
                        return true;
                    return !task_delegation_eligible();
                }
            );
            if (!worker_done)
            {
                //Remove task from the queue
                getTask = taskList.front();
                taskList.pop();
            }
        }
        if (!worker_done)
        {
            //process task
            std::string statement = "Processing TaskID:";
            std::stringstream convert;
            convert << getTask.taskID;
            statement += convert.str();
            //print task information
            safe_cout(statement);
            //"process" task
            usleep(5000);
        } 
    }
}
/**
 * master_thread(): 
 * This thread is responsible for creating task objects and pushing them onto the queue
 * After this, it will notify all other threads who are waiting to consume data
 */
void master_thread()
{
    safe_cout("master_thread() initialized");
    for (int i = 0; i < 10; i++)
    {
        //Following 2 lines needed if you want to don't want this thread to bombard the queue with tasks before processing of a task can be done
        while (!task_delegation_eligible() )    //task_eligible() is true IFF queue is empty
            std::this_thread::yield();          //yield execution to other threads (if there are tasks on the queue)
        //create a new task
        task newTask;
        newTask.taskID = (i+1);
        //lock the queue then push
        {
            std::unique_lock<std::mutex> q_lock(queue_accessor);
            taskList.push(newTask);
        }//unique_lock destroyed here
        cv.notify_one();
    }
    safe_cout("master_thread() complete");
}
int main(void)
{
    std::thread MASTER_THREAD(master_thread);   //create a thread object named MASTER_THREAD and have it run the function master_thread()
    std::thread WORKER_THREAD_1(worker_thread);
    std::thread WORKER_THREAD_2(worker_thread);
    std::thread WORKER_THREAD_3(worker_thread);
    MASTER_THREAD.join();
    //wait for the queue tasks to finish
    while (!task_delegation_eligible());    //wait if the queue is full
    /** 
     * Following 2 lines
     * Terminate worker threads => this doesn't work as expected.
     * The model is fine as long as you don't try to stop the worker 
     * threads like this as it might skip a task, however this program
     * will terminate
     */
    worker_done = true;
    cv.notify_all();
    WORKER_THREAD_1.join();
    WORKER_THREAD_2.join();
    WORKER_THREAD_3.join();
    return 0;
}

Thanks to lot

程序中存在可见性问题:在一个线程中更改worker_done标志可能不会被工作线程观察到。为了保证一个操作的结果可以被第二个操作观察到,那么你必须使用某种形式的同步来确保第二个线程看到第一个线程做了什么。要解决这个问题,你可以使用Jarod42建议的atomic。

如果您做这个程序是为了练习,它是好的,但对于实际应用程序,您可以从现有的线程池中获益,这将大大简化您的代码。

最新更新