具有两个线程(生产者、消费者)的 STL 队列



我想要队列安全的关键部分,以便线程不会同时访问队列。即使我注释与"关键"部分相关的行,此代码也有效。谁能解释为什么?

queue<int> que;
CRITICAL_SECTION csection;
int i=0;
DWORD WINAPI ProducerThread(void*)
{
    while(1)
    {
        //if(TryEnterCriticalSection(&csection))
        {
            cout<<"Pushing value "<<i<<endl;
            que.push(i++);
            //LeaveCriticalSection(&csection);
        }
    }
}
//Consumer tHread that pops out the elements from front of queue
DWORD WINAPI ConsumerThread(void*)
{
    while(1)
    {
        //if(TryEnterCriticalSection(&csection))
        {
            if(!que.empty())
            {
                cout<<"Value in queue is "<<que.front()<<endl;
                que.pop();
            }
            else
                Sleep(2000);
            //LeaveCriticalSection(&csection);
        }
    }
}
int _tmain(int argc, _TCHAR* argv[])
{
    HANDLE handle[2];
    //InitializeCriticalSection(&csection);
    handle[0]=NULL;
    handle[1]=NULL;
    handle[0]=CreateThread(0,0,(LPTHREAD_START_ROUTINE)ProducerThread,0,0,0);
    if(handle[0]==NULL)
        ExitProcess(1);
    handle[1]=CreateThread(0,0,(LPTHREAD_START_ROUTINE)ConsumerThread,0,0,0);
    if(handle[1]==NULL)
        ExitProcess(1);
    WaitForMultipleObjects(2,handle,true,INFINITE);
    return 0;
}

这是偶然的,大概有两个原因:

  1. 它不起作用,但你永远不会注意到。使用者拉取队列中的任何内容,或者它认为队列中的任何内容。如果什么都没有,它会一直睡到制片人推了什么。这"有效"是因为生产者只附加到末尾,而消费者只从开头阅读。除了更新size.您最终很可能会拥有一个处于有元素但size不反映它的状态的队列。这很讨厌,但相反的情况迟早也会发生,甚至更令人讨厌。
    你无从得知。好吧,您最终可能会知道排队的工作项是否由于某种原因"消失",或者如果内存不足,但请尝试找出原因。
  2. 您可以使用 printf(或 std::cout ,这是相同的),它由关键部分在内部锁定。这种"某种"以您需要的方式锁定对队列的访问,除非它没有。它将在 99.9% 的时间内工作(意外地,因为消费者在尝试打印时会被阻止,这比附加到队列的生产者需要更长的时间来唤醒)。但是,当上下文切换在打印后发生时,它会突然失败。砰,你死定了。

您确实绝对需要使用关键部分对象或互斥锁来保护关键代码段。否则,结果是不可预测的。与人们可能认为的相反,"但它有效"不是一件好事,它是可能发生的最糟糕的事情。因为它只工作,直到它不工作,然后你不知道为什么。

也就是说,您可以使用 IO 完成端口,它非常有效地为您完成所有工作。您可以使用 GetQueuedCompletionStatus 从端口中提取"事件",并使用 PostQueuedCompletionStatus 发布一个事件。完成端口执行队列的整个处理,包括为您正确同步到多个使用者(它以 LIFO 顺序执行此操作,这对于避免上下文切换和缓存失效是有利的)。
每个事件都包含一个指向OVERLAPPED结构的指针,但完成端口不使用它,您可以只传递任何指针(或者,如果您感觉更好,则传递一个指向OVERLAPPED的指针,后跟您自己的数据)。

在您的特定情况下,cout 将比"get"花费数百倍的时间。 当队列为空时,您就会睡觉,这允许另一个线程在您的"消费者"线程获取任何队列之前填满大量队列。

全速运行(无调试打印,无睡眠),确保运行很长时间,并用简单的数学检查另一端的值。

像这样:

int old_val = val;
while(1)
{
    if(!que.empty())
    {
       int  val = que.front();
       que.pop();
       if (old_val+1 != val)
       {
          /// Do something as things have gone wrong!
       }
     }
}

请注意,这可能不会立即/微不足道地出错。您希望运行它数小时,最好在计算机上运行其他内容 - 类似于批处理文件:

@echo off
:again 
dir c: /s > NUL:
goto again

[自从我为 Windows 编写批处理脚本以来已经有一段时间了,所以这可能不是 100% 正确的,但我认为你应该能够用谷歌搜索我出错的答案 - 这个想法是"intterrupt"机器]。

此外,尝试运行线程对的多个副本,每对线程具有单独的队列 - 这将强制进行更多的调度活动,并可能引发问题。

就像安东说的,其中一些东西通常很难复制。我在实时操作系统中遇到了一个问题,队列被搞砸了 - 唯一真实的迹象是内存最终在"压力测试"期间耗尽(它做"随机"的事情,包括几个不同的中断源)。该操作系统已经在生产测试中用数百个单元进行了测试,并作为真正的生产系统在现场发布[在不同处理器上运行的相同代码在世界各地运行电话交换机,再次,没有客户抱怨内存泄漏],似乎没有内存泄漏!但是队列处理中的一个"漏洞",在一个函数中,只运行。在认为是压力测试本身偶尔会遇到队列堆积的奇怪情况之后,我最终发现了实际问题 - 队列的读写之间的中断命中 - 一个正好有两个指令的漏洞,并且只有当一个中断例程在发送消息时被另一个中断例程中断时......我宁愿不再调试那个!

CRITICAL_SECTION防止的错误类型的最大问题之一是很难重现它们。你必须预测它是如何失败的,而无法证明它。

当您保护自己的代码而不是包装非线程安全的库调用时,通常可以通过向某个位置添加Sleep来触发争用条件。在您发布的代码中,没有机会为生产者执行此操作(无论破坏了什么不变量,它都是在que.push内部完成的),并且当只有一个消费者时,消费者检查空队列的潜在 TOCTTOU 问题不存在。如果我们可以将Sleep添加到队列实现中,那么我们将能够以可预测的方式使事情出错。

如果只有一个生产者和一个使用者,则队列代码对于此类弹出轮询可能是安全的。 如果生产者推送使用临时索引/指针将数据插入到下一个空队列位置,并且仅将递增的"临时索引"存储到队列"下一个空"成员中,则 queue.empty 可以返回 true,直到使用者可以安全地弹出数据。这种操作可能是设计好的,也可能是偶然产生的。

一旦你拥有不止一个生产者或多个消费者,它迟早肯定会爆炸。

编辑 - 即使队列对于一个生产者和一个使用者来说是安全的,除非它被记录下来,否则你不应该依赖它 - 一些 b***d 会在下一个版本中更改实现:(

最新更新