我有一个单线程应用程序,它使用boost::lockfree::queue
(来自Boost 1.57)。(我使用队列的原因是它是多线程应用程序中也使用的库的一部分。)
队列似乎正在以某种方式丢失元素。
使用队列的类的行为如下:
- 有两个单独的函数在队列上调用
push
。除了队列本身提供的之外,没有线程安全逻辑 - 有一个函数消耗队列中的元素;它使用
while(my_queue.pop(temp_element))
循环来消耗所有元素。此函数由互斥锁保护,以确保所有元素都指向单个使用者。但是,如果在一行中检测到两个重复的元素(与==
相比),则跳过第二个元素
我观察到元素似乎没有在系统中一致传播,因此使用std::atomic_uint
s,我创建了一些计数器来检查离开队列的警报数量是否与进入队列的警报数相同:
- 两种
push
ing方法中的每种方法都有单独的计数器。我们将这些称为pushed_method1
和pushed_method2
-
"已处理"元素和"跳过"元素有单独的计数器。我们将这些称为
consumed
和skipped
。在循环中,它们更新如下(重命名变量,并在[square brackets]
中使用描述性伪代码):ElementType previous{[ initialize in invalid state ]}; while (my_queue.pop(temp_element)) { if (! [ check if `previous` is valid ] || previous != temp_element) { [ ... log a message ] [ ... insert the element into a vector to be returned ] ++consumed; } else { // ... log a message ++skipped; } previous = temp_element; }
在循环之后,我检查计数器和队列的状态,并进行健全性检查:
auto postconsume_pushed_method1 = pushed_method1.load();
auto postconsume_pushed_method2 = pushed_method2.load();
auto all_consumed = my_queue.empty();
assert(
!all_consumed
|| postconsume_pushed_method1 + postconsume_pushed_method2
== consumed + skipped);
这个断言通常通过,但在我的单线程应用程序中,它有时会失败。(在多线程的情况下,它有时也会失败,但我希望专注于单线程的情况,以简化问题。)
在多线程的情况下,我可以想象操作正在被重新排序,以便在.empty()
检查之后出现一个或多个load
,但我希望情况并非如此,因为我希望std::atomic
和boost::lockfree
包括用于这类事情的内存围栏。
然而,在单线程的情况下,这应该无关紧要,因为在应用程序执行"consume"方法时,元素永远不应该是push
ed,并且在循环之后,警报队列应始终为空(因为在pop
返回false
之前,循环不会中断)。
我是不是误解了什么?我的逻辑有错误吗?我在boost::lockfree::queue
中发现错误的可能性很小吗?
队列是无锁的,因此当队列已满时,push
不会阻塞。
lockfree::queue::push
返回一个布尔值,告诉元素是否已进入队列。当队列已满时,push
返回false
,因此您可能需要检查是否存在这种情况。