在已完成的std::线程连接上发生死锁



我目前有一个关于加入一个完成std::线程的问题。我有一个简单的消费者类:

//Consumer.h
class Consumer
{
public:
    //Ctor, Dtor
    void Start();
    void Release();

private:
    void Run();
    std::atomic<bool> m_terminate;
    std::thread m_thread;
};

//Consumer.cpp
void Consumer::Start()
{
    m_thread = std::thread(&EncodingProcessor::Run, this);
}
void Consumer::Release()
{
    m_terminate = true;
    if (m_thread.joinable())
        m_thread.join();        //Here is the deadlock
}
void Consumer::Run()
{
    while (true)
    {
        if (m_terminate)
            break;

        //This queue is blocking, it is fed with events from an encoder
        PMVR::HardwareEncoder::EventWithId currentEvent = m_hwEncoder.GetEncodeEvent();
        //If there is an event, again, wait for it, until the according frame is encoded
        WaitForSingleObject(currentEvent.handle, INFINITE);
        //Lock the encoded bitstream
        PMVR::HardwareEncoder::BitStreamBufferInfo currentData =
        m_hwEncoder.LockBitstream(currentEvent.id);
        std::vector<unsigned char> tmp((unsigned char*)currentData.bitstreamPointer,
        (unsigned char*)currentData.bitstreamPointer + currentData.bitstreamSize);
        //And process it.
        m_consumer->ProcessEncodingResult(currentData.bitstreamPointer, currentData.bitstreamSize);
        m_hwEncoder.UnlockBitstream(currentEvent.id);
    }
}

所以我可以启动线程。线程做它应该做的。我可以结束线程,这样Run()中的循环就被打破了。但是,如果我想加入线程,就会遇到死锁。

我们不是在谈论在main()完成后被释放的线程。我可以Release()它通过按键,但不是时间它将工作。

编辑:

Start()是这样调用的:

m_processorThread = new Consumer(*m_hwEncoder,
    std::make_unique<FileSystemWriter>("file.h264"));
m_processorThread->Start();

Release()被这样调用:

if (glfwGetKey(handler->GetWindow(), GLFW_KEY_M) && !m_pressed)
{
    m_pressed = true;
    sessionAPI.Close();
}

sessionAPI.close()只调用Release()。仅此而已。

Edit2:

对不起,你是对的。到目前为止,我发布的代码正在工作……所以问题似乎在Run()方法内部(更新,见上文)。

所以我的误解是,因为在循环的顶部中断,它下面的所有东西都不会被执行…看起来是GetEncodeEvent()产生了死锁。但是为什么呢?是否有一种优雅的方法可以在线程不等待某事的时候中断整个循环?此外,事件的提供者仍然是活动的,所以应该有通知…

我认为问题在这里:

{
    if (m_terminate)
        break;

    //This queue is blocking, it is fed with events from an encoder
    PMVR::HardwareEncoder::EventWithId currentEvent = m_hwEncoder.GetEncodeEvent();
    //If there is an event, again, wait for it, until the according frame is encoded
    WaitForSingleObject(currentEvent.handle, INFINITE);

设置m_terminatetrue都很好,但是你的线程不在那里。它被阻塞在WaitForSingleObject行。

这是使用std::condition_variable的一个很好的理由。

的例子:

#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>
#include <cassert>
struct some_work {};
struct worker
{
  void start()
  {
    assert(_stopped);
    _stopped = false;
    // memory fence happened here. The above write is safe
    _thread = std::thread(&worker::run, this);
  }
  void stop()
  {
    auto lock = std::unique_lock<std::mutex>(_sc_mutex);
    // this is a memory fence
    assert(!_stopped);
    _stopped = true;
    // so is this
    lock.unlock();
    // notify_all, in case someone adds a thread pool and does not look here!
    // note: notify *after* we have released the lock.
    _state_changed.notify_all(); 
    if (_thread.joinable())
      _thread.join();
  }
  void post_work(some_work w)
  {
    auto lock = std::unique_lock<std::mutex>(_sc_mutex);
    assert(!_stopped);
    _workload.push(std::move(w));
    lock.unlock();
    // only notify one - we only added one piece of work.
    _state_changed.notify_one();
  }
  // allows a monitor to wait until all work is flushed before
  // stopping if necessary
  void wait()
  {
    auto lock = std::unique_lock<std::mutex>(_sc_mutex);
    _maybe_stop.wait(lock, [this] 
                        {
                          return should_stop()
                            or no_more_work();
                        });
  }
private:
  void run()
  {
    std::unique_lock<std::mutex> lock(_sc_mutex);
    _state_changed.wait(lock, [this]
                        { 
                          return this->work_to_do() 
                            or this->should_stop();
                        });
    if (should_stop())
      return;
    // there is work to do...
    auto my_work = std::move(_workload.front());
    _workload.pop();
    lock.unlock();
    // do my work here, once we've locked.
    // this is here for the wait() function above.
    // if you don't want a wait(), you can dump this
    lock.lock();
    if (no_more_work() or should_stop())
    {
        lock.unlock();
        _maybe_stop.notify_all();
    }
  }
  bool work_to_do() const { return not _workload.empty(); }
  bool no_more_work() const { return _workload.empty(); }
  bool should_stop() const { return _stopped; }
  std::mutex _sc_mutex;
  std::condition_variable _state_changed;
  std::condition_variable _maybe_stop;
  std::queue<some_work> _workload;
  std::thread _thread;
  bool _stopped = true;
};
int main()
{
  worker w;
  w.start();
  w.post_work(some_work());
  w.post_work(some_work());
  w.post_work(some_work());
  w.post_work(some_work());
  w.post_work(some_work());
  w.post_work(some_work());
  // do we want to ensure that all the work is done?
  w.wait();
  w.stop();
}

您的代码表明GetEncodeEvent正在阻塞。如果这是真的,那么你的代码可能会无限期地坐在那行代码上,而不会看到m_terminate中的变化。随后,代码可以无限期地在WaitForSingleObject .

您可能需要考虑在整个函数中测试m_terminate

你不能中断WaitForSingleObject,但你可以指定一个超时,并简单地把它包装在一个循环

for (;;) {
    if (m_terminate)
        return;
    auto res = WaitForSingleObject(currentEvent.handle, 20);
    switch (res) { // check the return value
      case WAIT_TIMEOUT: continue;
      case WAIT_OBJECT_0: break;
      default:  error(...);
    }
}

您的另一个选择是为线程创建一个WaitEvent,并使用WaitForMultipleObjects两个句柄,并在Consumer::Release中使用SetEvent来通知线程。

最新更新