为什么std::atom_flag::notify_one()在两个线程之间释放std::atom_flag::wait



我正在使用MINGW64在Windows 10 64位操作系统上使用gcc 11.2.0学习std::atomic_flag中的c++ 20。

编译器设置如下:

g++ - wall . exe -fexceptions - g化gnu + + 2 o3 - m64

我研究了《Two Atomic Flags》中的乒乓球例子。

我运行了示例代码,但是遇到了死锁问题。

示例代码如下:

// pingPongAtomicFlags.cpp
#include <iostream>
#include <atomic>
#include <thread>
std::atomic_flag condAtomicFlag1{};
std::atomic_flag condAtomicFlag2{};
std::atomic<int> counter{};
constexpr int countlimit = 1'000'000;
void ping() {
    while(counter < countlimit) {
        condAtomicFlag1.wait(false);               // (1)
        condAtomicFlag1.clear();                   // (2)
        ++counter;
        
        condAtomicFlag2.test_and_set();           // (4)
        condAtomicFlag2.notify_one();             // (3)
    }
}
void pong() {
    while(counter < countlimit) {
        condAtomicFlag2.wait(false);
        condAtomicFlag2.clear();
        
        condAtomicFlag1.test_and_set();
        condAtomicFlag1.notify_one();
    }
}
int main() {
     auto start = std::chrono::system_clock::now();  
    condAtomicFlag1.test_and_set();                    // (5)
    std::thread t1(ping);
    std::thread t2(pong);
    t1.join();
    t2.join();
    std::chrono::duration<double> dur = std::chrono::system_clock::now() - start;
    std::cout << "Duration: " << dur.count() << " seconds" << std::endl;
}

这个死锁问题应该由condAtomicFlag1.wait ( false )condAtomicFlag2.wait ( false )来阻止。

为了了解死锁的原因,我将atom_notifyAck_flag1atom_notifyAck_flag2放在notify_one()之后监视其行为,并使用thr_notifyAck_monitor_flag1thr_notifyAck_monitor_flag2两个线程分别监视atom_notifyAck_flag1atom_notifyAck_flag2

void pong()中的 atom_notifyAck_flag1将在condAtomicFlag1.notify_one()之后被设置为false。在void ping()中,condAtomicFlag1.wait(false)收到通知后,condAtomicFlag1.wait(false)会释放block,然后将atom_notifyAck_flag1设置为trueatom_notifyAck_flag2的行为与atom_notifyAck_flag1相似。

在我的计算机上,atom_notifyAck_flag应该在0.0001ms内从false更改为true。如果atom_notifyAck_flagsleep_for(10ms)之后再次被监控为false,我将notify_one()视为失败并触发新的notify_one()以通过wait()释放块。

thrf_notifyAck_monitor()的示例如下:

// pingPongAtomicFlags.cpp
#include <iostream>
#include <atomic>
#include <thread>
std::atomic_flag condAtomicFlag1{ATOMIC_FLAG_INIT};
std::atomic_flag condAtomicFlag2{ATOMIC_FLAG_INIT};
std::atomic<bool> atom_notifyAck_flag1 {false};
std::atomic<bool> atom_notifyAck_flag2 {false};
std::atomic<int> counter{};
constexpr int countlimit = 1'000'000;
using namespace std::literals;
void ping()
{
    while ( counter < countlimit )
    //Thanks to Nate Eldredge for the reminder of edge condition of race.
    {
        condAtomicFlag1.wait ( false );           // (1)
        condAtomicFlag1.clear();                  // (2)
        atom_notifyAck_flag1 = true;
        ++counter;
        condAtomicFlag2.test_and_set();           // (4)
        condAtomicFlag2.notify_one();             // (3)
        atom_notifyAck_flag2 = false;
    }
}
void pong()
{
    while ( counter < countlimit )
    {
        condAtomicFlag2.wait ( false );
        condAtomicFlag2.clear();
        atom_notifyAck_flag2 = true;
        condAtomicFlag1.test_and_set();
        condAtomicFlag1.notify_one();
        atom_notifyAck_flag1 = false;
    }
}
void thrf_notifyAck_monitor ( const std::atomic<bool>& last_notifyAck_check
                              , std::atomic<bool>& atom_notifyAck_Flag
                              , int flagId )
{
    int atom_notifyAck_Flag_failCounter {};
    while ( !last_notifyAck_check )
    {
        if ( !atom_notifyAck_Flag ) //atom_notifyAck_Flag is false
        {
            std::this_thread::sleep_for ( 10ms ); //wait 10ms and check again
            if ( !atom_notifyAck_Flag )
                //atom_notifyAck_Flag is false for 10ms
            {
                std::cout << "atom_notifyAck_Flag_"
                          << flagId
                          << " failed [" << atom_notifyAck_Flag_failCounter
                          << "] times "
                          << "at counter "
                          << counter << "n";
                ++atom_notifyAck_Flag_failCounter;
                condAtomicFlag1.notify_all();
            }
        }
    }
}
int main()
{
    std::cout << "start ping pong ...n";
    auto start = std::chrono::system_clock::now();
    condAtomicFlag1.test_and_set();                    // (5)
    std::thread t1 ( ping );
    std::thread t2 ( pong );

    std::atomic<bool> last_notifyAck_check {false};
    //Thanks to Ted Lyngmo for reminder
    std::thread thr_notifyAck_monitor_flag1 ( thrf_notifyAck_monitor
            , std::ref ( last_notifyAck_check )
            , std::ref ( atom_notifyAck_flag1 ), 1 );
    std::thread thr_notifyAck_monitor_flag2 ( thrf_notifyAck_monitor
            , std::ref ( last_notifyAck_check )
            , std::ref ( atom_notifyAck_flag2 ), 2 );
    t1.join();
    t2.join();
    last_notifyAck_check = true;
    thr_notifyAck_monitor_flag1.join();
    thr_notifyAck_monitor_flag2.join();

    std::cout  << "n";
    std::cout << "counter " << counter << "n";
    std::chrono::duration<double> dur = std::chrono::system_clock::now() - start;
    std::cout << "Duration: " << dur.count() << " seconds" << std::endl << "n";
}

监视器线程工作。

结果消息如下:

start ping pong ...
atom_notifyAck_Flag_2 failed [0] times at counter 284
atom_notifyAck_Flag_1 failed [0] times at counter 713
atom_notifyAck_Flag_2 failed [1] times at counter 1145
atom_notifyAck_Flag_1 failed [1] times at counter 4128
atom_notifyAck_Flag_1 failed [2] times at counter 5519
atom_notifyAck_Flag_1 failed [3] times at counter 28465
atom_notifyAck_Flag_1 failed [4] times at counter 28812
atom_notifyAck_Flag_2 failed [2] times at counter 35854
atom_notifyAck_Flag_2 failed [3] times at counter 54880
atom_notifyAck_Flag_1 failed [5] times at counter 55227
atom_notifyAck_Flag_2 failed [4] times at counter 65113
atom_notifyAck_Flag_1 failed [6] times at counter 65519
atom_notifyAck_Flag_1 failed [7] times at counter 78369
atom_notifyAck_Flag_1 failed [8] times at counter 89895
atom_notifyAck_Flag_2 failed [5] times at counter 90408
atom_notifyAck_Flag_1 failed [9] times at counter 90872
atom_notifyAck_Flag_1 failed [10] times at counter 115252
atom_notifyAck_Flag_1 failed [11] times at counter 121390
atom_notifyAck_Flag_2 failed [6] times at counter 149745
atom_notifyAck_Flag_2 failed [7] times at counter 275242
atom_notifyAck_Flag_2 failed [8] times at counter 276173
atom_notifyAck_Flag_1 failed [12] times at counter 304177
atom_notifyAck_Flag_2 failed [9] times at counter 417666
atom_notifyAck_Flag_2 failed [10] times at counter 421162
atom_notifyAck_Flag_2 failed [11] times at counter 455784
atom_notifyAck_Flag_2 failed [12] times at counter 466523
atom_notifyAck_Flag_2 failed [13] times at counter 470350
atom_notifyAck_Flag_1 failed [13] times at counter 485684
atom_notifyAck_Flag_2 failed [14] times at counter 486125
atom_notifyAck_Flag_1 failed [14] times at counter 486566
atom_notifyAck_Flag_2 failed [15] times at counter 492096
atom_notifyAck_Flag_2 failed [16] times at counter 515809
atom_notifyAck_Flag_1 failed [15] times at counter 516158
atom_notifyAck_Flag_1 failed [16] times at counter 516509
atom_notifyAck_Flag_1 failed [17] times at counter 516927
atom_notifyAck_Flag_2 failed [17] times at counter 517392
atom_notifyAck_Flag_2 failed [18] times at counter 517840
atom_notifyAck_Flag_2 failed [19] times at counter 518319
atom_notifyAck_Flag_2 failed [20] times at counter 518810
atom_notifyAck_Flag_2 failed [21] times at counter 519290
atom_notifyAck_Flag_2 failed [22] times at counter 541904
atom_notifyAck_Flag_1 failed [18] times at counter 542383
atom_notifyAck_Flag_2 failed [23] times at counter 543410
atom_notifyAck_Flag_1 failed [19] times at counter 544379
atom_notifyAck_Flag_2 failed [24] times at counter 544876
atom_notifyAck_Flag_2 failed [25] times at counter 545324
atom_notifyAck_Flag_2 failed [26] times at counter 566521
atom_notifyAck_Flag_2 failed [27] times at counter 588983
atom_notifyAck_Flag_2 failed [28] times at counter 589465
atom_notifyAck_Flag_1 failed [20] times at counter 592211
atom_notifyAck_Flag_1 failed [21] times at counter 611563
atom_notifyAck_Flag_2 failed [29] times at counter 614010
atom_notifyAck_Flag_1 failed [22] times at counter 619736
atom_notifyAck_Flag_2 failed [30] times at counter 642781
atom_notifyAck_Flag_1 failed [23] times at counter 651436
atom_notifyAck_Flag_1 failed [24] times at counter 772356
atom_notifyAck_Flag_1 failed [25] times at counter 772738
atom_notifyAck_Flag_2 failed [31] times at counter 773207
atom_notifyAck_Flag_1 failed [26] times at counter 773691
atom_notifyAck_Flag_1 failed [27] times at counter 774162
atom_notifyAck_Flag_1 failed [28] times at counter 774575
atom_notifyAck_Flag_2 failed [32] times at counter 864664
atom_notifyAck_Flag_2 failed [33] times at counter 986915
atom_notifyAck_Flag_1 failed [29] times at counter 1000000
counter 1000000
Duration: 1.09015 seconds

Process returned 0 (0x0)   execution time : 1.126 s

在这个测试中,它显示wait()并不总是正确地接收notify_one,所以它一直等待并被阻塞。但是,std::atomic_flag::notify_one()提供了原子行为,所以不应该发生碰撞

notify_one()使用这种非阻塞函数是否容易失效?我能做些什么来避免这种死锁?

程序末尾有一个竞争,可能会导致死锁。

假设我们得到以下排序,从counter == 999999开始:

ping()                                      pong()
======                                      ======
condAtomicFlag1.wait(false); // waiting
                                            condAtomicFlag1.test_and_set();
                                            condAtomicFlag1.notify_one();
// wakes up
condAtomicFlag1.clear();
++counter; // counter = 1000000
                                            while(counter < countlimit) // false
                                            // pong() returns
condAtomicFlag2.test_and_set();
condAtomicFlag2.notify_one();  // nobody listening
while(counter <= countlimit) { // true
condAtomicFlag1.wait(false);   // never becomes true, deadlock

我还没有仔细考虑过正确的修复方法,但也许你可以。

在我的测试中,这是唯一真正的死锁。来自监视器的所有其他通知看起来都是假的,可能是因为系统调度线程的时间超过了10毫秒。如果我将超时时间设置为1秒,那么除了"计数器1000000"之外,所有的通知都将消失。

基于一个假设,atomFlag_pong.notify_one()可能由于某种原因在我的系统中被丢弃,因此atomFlag_pong.wait(false)只能永远等待。

为了确认atomFlag_pong.notify_one()成功到达atomFlag_pong.wait(false),我在一个额外的while循环中检查atomFlag_pong.test()作为确认,并不断发送atomFlag_pong.notify_one()直到atomFlag_pong.test()==false,这意味着atomFlag_pong.wait(false)成功通过。在第二个while循环中,必须再次检查&& counter < countLimit条件,因为counter可以在第二个while循环中被线程thr_ping修改。

以另一种方式,我使用类似的确认技巧确认atomFlag_pong.notify_one()成功到达。

经过测试,它最终没有死锁。

我的示例代码如下:

// pingPongAtomicFlags.cpp
#include <iostream>
#include <atomic>
#include <thread>
std::atomic_flag atomFlag_ping {};
std::atomic_flag atomFlag_pong {};
std::atomic<int> counter{};
constexpr int countLimit = 1'000'000;
using namespace std::literals;
void thrf_ping()
{
    while ( counter < countLimit )
    {
        atomFlag_ping.wait ( false );
        atomFlag_ping.clear();
        //set atomFlag_ping as false which means atomFlag_ping.wait(false) is passed
        ++counter;
        atomFlag_pong.test_and_set();   //set atomFlag_pong as true
        // use && counter < countLimit condition again is necessary,
        // because counter might be changed in the previous lines.
        while ( atomFlag_pong.test() == true && counter < countLimit)
        {
            // Send atomFlag_pong.notify_one() continuously till [atomFlag_pong.test() == false]
            // to avoid the atomFlag_pong.notify_one() is dropped by system
            // [atomFlag_pong.test() == false] means that atomFlag_pong.wait(false) is passed,
            // so progress of pong can move on
            atomFlag_pong.notify_one();
        }
    }
}
void thrf_pong()
{
    while ( counter < countLimit )
    {
        atomFlag_pong.wait ( false );
        atomFlag_pong.clear();
        //set atomFlag_pong as false which means atomFlag_pong.wait(false) is passed
        atomFlag_ping.test_and_set();

        // use && counter < countLimit condition again is necessary,
        // because counter might be changed in the previous lines.
        while (  atomFlag_ping.test() == true && counter < countLimit)
        {
            // Send atomFlag_ping.notify_one() continuously till [atomFlag_ping.test() == false]
            // to avoid the atomFlag_ping.notify_one() is dropped by system
            // [atomFlag_ping.test() == false] means that atomFlag_ping.wait(false) is passed,
            // so progress of ping can move on
            atomFlag_ping.notify_one();
        }
    }
}
int main()
{
    std::cout << "start ping pong ...n";
    auto start = std::chrono::system_clock::now();
    atomFlag_ping.test_and_set();   // drop the first ball, set atomFlag_ping as true
    std::thread thr_ping ( thrf_ping );
    std::thread thr_pong ( thrf_pong );
    thr_ping.join();
    thr_pong.join();
    std::cout  << "n";
    std::cout << "counter " << counter << "n";
    std::chrono::duration<double> dur = std::chrono::system_clock::now() - start;
    std::cout << "Duration: " << dur.count() << " seconds" << std::endl << "n";
}

最新更新