C++条件变量,用于发出分离线程执行暂停结束的信号



我有一些代码正在处理,其中派生了一个分离的线程,完成了一些工作,然后应该等待来自main()的信号,然后再向main发送另一个信号,指示线程已经退出。

我对条件变量还相当陌生,不过我以前处理过一些多线程代码。(主要是互斥。(

这是我试图实现的,但它并没有按照我预期的方式运行。(可能我误解了什么。(

这背后的想法是将一个包含两个标志的结构传递给每个分离的线程。第一个标志表示CCD_ 2表示";退出并放下线程函数的末尾"是可以的;。第二个标志由线程本身设置,并向main()发出线程确实已退出的信号。(这只是为了确认来自main()的信号接收正常,并发回一些东西。(

#include <cstdlib> // std::atoi
#include <iostream>
#include <thread>
#include <vector>
#include <random>
#include <future>
#include <condition_variable>
#include <mutex>
struct ThreadStruct
{
int id;
std::condition_variable cv;
std::mutex m;
int ok_to_exit;
int exit_confirm;
};

void Pause()
{
std::cout << "Press enter to continue" << std::endl;
std::cin.get();
}

void detachedThread(ThreadStruct* threadData)
{
std::cout << "START: Detached Thread " << threadData->id << std::endl;

// Performs some arbitrary amount of work.
for(int i = 0; i < 100000; ++ i);
std::cout << "FINISH: Detached thread " << threadData->id << std::endl;

std::unique_lock<std::mutex> lock(threadData->m);
std::cout << "WAIT: Detached thread " << threadData->id << std::endl;
threadData->cv.wait(lock, [threadData]{return threadData->ok_to_exit == 1;});
std::cout << "EXIT: Detached thread " << threadData->id << std::endl;
threadData->exit_confirm = 1;
}
int main(int argc, char** argv)
{

int totalThreadCount = 1;
ThreadStruct* perThreadData = new ThreadStruct[totalThreadCount];
std::cout << "Main thread starting " << totalThreadCount << " thread(s)" << std::endl;
for(int i = totalThreadCount - 1; i >= 0; --i)
{
perThreadData[i].id = i;
perThreadData[i].ok_to_exit = 0;
perThreadData[i].exit_confirm = 0;

std::thread t(detachedThread, &perThreadData[i]);
t.detach();

}

for(int i{0}; i < totalThreadCount; ++i)
{
ThreadStruct *threadData = &perThreadData[i];

std::cout << "Waiting for lock - main() thread" << std::endl;
std::unique_lock<std::mutex> lock(perThreadData[i].m);
std::cout << "Lock obtained - main() thread" << std::endl;
perThreadData[i].cv.wait(lock);
threadData->ok_to_exit = 1;
// added after comment from Sergey
threadData->cv.notify_all(); 
std::cout << "Done - main() thread" << std::endl;

}
for(int i{0}; i < totalThreadCount; ++i)
{
std::size_t thread_index = i;
ThreadStruct& threadData = perThreadData[thread_index];

std::unique_lock<std::mutex> lock(threadData.m);
std::cout << "i=" << i << std::endl;
int &exit_confirm = threadData.exit_confirm;
threadData.cv.wait(lock, [exit_confirm]{return exit_confirm == 1;});
std::cout << "i=" << i << " finished!" << std::endl;
}
Pause();
return 0;
}

这条线路:

WAIT: Detached thread 0

但分离的线程永远不会停止。我做错了什么?

编辑:进一步的实验-这有帮助吗

我认为通过删除一个步骤来简化事情可能会有所帮助。在下面的示例中,main()不会向分离的线程发送信号,它只是等待来自分离线程的信号。

但是,这个代码再次挂起-在打印DROP之后。。。这意味着分离的线程可以正常退出,但main()对此一无所知

#include <cstdlib> // std::atoi
#include <iostream>
#include <thread>
#include <vector>
#include <random>
#include <future>
#include <condition_variable>
#include <mutex>
struct ThreadStruct
{
int id;
std::condition_variable cv;
std::mutex m;
int ok_to_exit;
int exit_confirm;
};

void Pause()
{
std::cout << "Press enter to continue" << std::endl;
std::cin.get();
}

void detachedThread(ThreadStruct* threadData)
{
std::cout << "START: Detached Thread " << threadData->id << std::endl;

// Performs some arbitrary amount of work.
for(int i = 0; i < 100000; ++ i);
std::cout << "FINISH: Detached thread " << threadData->id << std::endl;

std::unique_lock<std::mutex> lock(threadData->m);

std::cout << "EXIT: Detached thread " << threadData->id << std::endl;
threadData->exit_confirm = 1;
threadData->cv.notify_all();
std::cout << "DROP" << std::endl;
}
int main(int argc, char** argv)
{

int totalThreadCount = 1;
ThreadStruct* perThreadData = new ThreadStruct[totalThreadCount];
std::cout << "Main thread starting " << totalThreadCount << " thread(s)" << std::endl;
for(int i = totalThreadCount - 1; i >= 0; --i)
{
perThreadData[i].id = i;
perThreadData[i].ok_to_exit = 0;
perThreadData[i].exit_confirm = 0;

std::thread t(detachedThread, &perThreadData[i]);
t.detach();

}
for(int i{0}; i < totalThreadCount; ++i)
{
std::size_t thread_index = i;
ThreadStruct& threadData = perThreadData[thread_index];

std::cout << "Waiting for mutex" << std::endl;
std::unique_lock<std::mutex> lock(threadData.m);
std::cout << "i=" << i << std::endl;
int &exit_confirm = threadData.exit_confirm;
threadData.cv.wait(lock, [exit_confirm]{return exit_confirm == 1;});
std::cout << "i=" << i << " finished!" << std::endl;
}
Pause();
return 0;
}

您的lambda通过值捕获,因此它永远看不到对exit_confim所做的更改。

通过引用捕获

int& exit_confirm = threadData.exit_confirm;
threadData.cv.wait(lock, [&exit_confirm] { return exit_confirm == 1; });
//                        ^
//                        | capture by-reference

你还需要delete[]你的new[],所以做

delete[] ThreadStruct;

当你们用完structs.


我还注意到免费后的一些堆使用情况,但当我对代码进行一些简化时,这种情况神奇地消失了。我没有进一步调查。

一些建议:

  • 将代码移动到处理ThreadStruct成员变量和锁的ThreadStruct类中。它通常使阅读和维护更简单
  • 删除未使用的变量和标头
  • 不要使用new[]/delete[]。对于本例,您可以使用std::vector<ThreadStruct>
  • 根本不要使用detach()——我在下面还没有做任何相关的工作,但我建议使用join()(在连接的线程上(来进行最后的同步。这就是它的作用
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
struct ThreadStruct {
int id;
// move this function into the ThreadStruct class
void detachedThread() {
std::cout << "START: Detached Thread " << id << std::endl;
// Performs some arbitrary amount of work (optimized away here)
std::cout << "FINISH: Detached thread " << id << std::endl;
std::lock_guard<std::mutex> lock(m);
std::cout << "EXIT: Detached thread " << id << std::endl;
exit_confirm = 1;
cv.notify_all();
std::cout << "DROP" << std::endl;
}
// add support functions instead of doing these things in your normal code
void wait_for_exit_confirm() {
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [this] { return exit_confirm == 1; });
}
void spawn_detached() {
std::thread(&ThreadStruct::detachedThread, this).detach();
}
private:
std::condition_variable cv;
std::mutex m;
int exit_confirm = 0;           // initialize
};

有了以上内容,main变得更干净了:

int main() {
int totalThreadCount = 1;
std::vector<ThreadStruct> perThreadData(totalThreadCount);
std::cout << "Main thread starting " << perThreadData.size() << " thread(s)n";
int i = 0;
for(auto& threadData : perThreadData) {
threadData.id = i++;
threadData.spawn_detached();
}
for(auto& threadData : perThreadData) {
std::cout << "Waiting for mutex" << std::endl;
std::cout << "i=" << threadData.id << std::endl;
threadData.wait_for_exit_confirm();
std::cout << "i=" << threadData.id << " finished!" << std::endl;
}
std::cout << "Press enter to continue" << std::endl;
std::cin.get();
}

为了未来的兴趣:修复了问题中发布的原始MWE。有两个问题

  • 未通过引用捕获lambda中的局部变量(请参阅其他答案(

  • 1main()0调用过多

    #include <cstdlib> // std::atoi
    #include <iostream>
    #include <thread>
    #include <vector>
    #include <random>
    #include <future>
    #include <condition_variable>
    #include <mutex>
    struct ThreadStruct
    {
    int id;
    std::condition_variable cv;
    std::mutex m;
    int ok_to_exit;
    int exit_confirm;
    };
    
    void Pause()
    {
    std::cout << "Press enter to continue" << std::endl;
    std::cin.get();
    }
    
    void detachedThread(ThreadStruct* threadData)
    {
    std::cout << "START: Detached Thread " << threadData->id << std::endl;
    // Performs some arbitrary amount of work.
    for (int i = 0; i < 100000; ++i);
    std::cout << "FINISH: Detached thread " << threadData->id << std::endl;
    std::unique_lock<std::mutex> lock(threadData->m);
    std::cout << "WAIT: Detached thread " << threadData->id << std::endl;
    threadData->cv.wait(lock, [&threadData]{return threadData->ok_to_exit == 1;});
    std::cout << "EXIT: Detached thread " << threadData->id << std::endl;
    threadData->exit_confirm = 1;
    threadData->cv.notify_all();
    std::cout << "DROP" << std::endl;
    }
    int main(int argc, char** argv)
    {
    int totalThreadCount = 1;
    ThreadStruct* perThreadData = new ThreadStruct[totalThreadCount];
    std::cout << "Main thread starting " << totalThreadCount << " thread(s)" << std::endl;
    for (int i = totalThreadCount - 1; i >= 0; --i)
    {
    perThreadData[i].id = i;
    perThreadData[i].ok_to_exit = 0;
    perThreadData[i].exit_confirm = 0;
    std::thread t(detachedThread, &perThreadData[i]);
    t.detach();
    }
    for(int i{0}; i < totalThreadCount; ++ i)
    {
    ThreadStruct *threadData = &perThreadData[i];
    std::cout << "Waiting for lock - main() thread" << std::endl;
    std::unique_lock<std::mutex> lock(perThreadData[i].m);
    std::cout << "Lock obtained - main() thread" << std::endl;
    //perThreadData[i].cv.wait(lock, [&threadData]{return threadData->ok_to_exit == 1;});
    std::cout << "Wait complete" << std::endl;
    threadData->ok_to_exit = 1;
    threadData->cv.notify_all();
    std::cout << "Done - main() thread" << std::endl;
    }
    for (int i{ 0 }; i < totalThreadCount; ++i)
    {
    std::size_t thread_index = i;
    ThreadStruct& threadData = perThreadData[thread_index];
    std::cout << "Waiting for mutex" << std::endl;
    std::unique_lock<std::mutex> lock(threadData.m);
    std::cout << "i=" << i << std::endl;
    int& exit_confirm = threadData.exit_confirm;
    threadData.cv.wait(lock, [&exit_confirm] {return exit_confirm == 1; });
    std::cout << "i=" << i << " finished!" << std::endl;
    }
    Pause();
    return 0;
    }
    

最新更新