我正在使用倒锁作为信号灯来表示队列更新(请注意注释掉的Sleep(1)
,稍后会用到(:
#include <stdio.h>
#include <omp.h>
#include <queue>
#include <stdint.h>
#include <windows.h>
class ThreadLock
{
protected:
omp_lock_t lock;
public:
ThreadLock() {
omp_init_lock(&lock);
}
~ThreadLock() {
omp_destroy_lock(&lock);
}
void acquire() {
omp_set_lock(&lock);
}
void release() {
omp_unset_lock(&lock);
}
};
std::queue< uint32_t > g_queue;
ThreadLock g_lock;
void producer()
{
uint32_t seq = 0;
g_lock.acquire();
while (true) {
Sleep(200);
#pragma omp critical
g_queue.push(++seq);
printf("Produced %un", seq);
g_lock.release();
//Sleep(1);
g_lock.acquire();
}
g_lock.release();
}
void consumer()
{
while (true) {
// Lock if empty
if (g_queue.empty()) {
printf("[Consumer] Acquiring lockn");
g_lock.acquire();
g_lock.release();
printf("[Consumer] Released lockn");
if (g_queue.empty()) {
printf("Still emptyn");
Sleep(100);
continue;
}
}
#pragma omp critical
{
printf("Consumed %un", g_queue.front());
g_queue.pop();
}
}
}
int main(int argc, char* argv[])
{
#pragma omp parallel sections
{
#pragma omp section
consumer();
#pragma omp section
producer();
}
return 0;
}
此代码包含一个争用条件,它会在一段时间后使使用者停止,如下所示:
[Consumer] Acquiring lock
Produced 1
Produced 2
[Consumer] Released lock
Consumed 1
Consumed 2
[Consumer] Acquiring lock
Produced 3
Produced 4
Produced 5
Produced 6
Produced 7
Produced 8
Produced 9
Produced 10
Produced 11
Produced 12
Produced 13
Produced 14
Produced 15
Produced 16
Produced 17
Produced 18
Produced 19
看来生产者线程在没有上下文切换的情况下匆忙完成发布/获取。好。让我们通过取消注释来强制它Sleep(1)
:
[Consumer] Acquiring lock
Produced 1
[Consumer] Released lock
Consumed 1
[Consumer] Acquiring lock
[Consumer] Released lock
Still empty
[Consumer] Acquiring lock
Produced 2
[Consumer] Released lock
Consumed 2
[Consumer] Acquiring lock
[Consumer] Released lock
Still empty
[Consumer] Acquiring lock
Produced 3
[Consumer] Released lock
Consumed 3
注意到那些Still empty
行了吗?看起来消费者设法在生产者的发布/获取行之间粘上额外的处理轮次。
我知道在消费者线程中添加另一个Sleep(1)
可以解决问题。但我觉得代码中这些固定的人为延迟是错误的(Sleep(200)
不算数,它仅用于演示目的(。
如何以正确的方式完成此操作,使用 OpenMP 和没有高于 2.0 的 OpenMP 版本?
您的代码中存在几个问题。您正在混合#pragma omp critical
和锁 - 这没有多大意义。您真正想要的是锁(用于保护队列上的所有操作(和条件变量(用于接收有关元素插入的通知(的组合。不幸的是,OpenMP 不提供条件变量的原语。您还可以对队列中的元素数使用计数信号量 - 这在 OpenMP 中也不可用。
然后是饥饿问题,您尝试用sleep
来解决 - 无论操作系统切换任务的任何提示,它都不会完美。您可以考虑使用 OpenMP 任务 +taskyield
(但这不是 OpenMP 2.0(。
归根结底,OpenMP 不太适合这种工作。OpenMP 更专注于拥有 1 线程 - 1 核心映射和分发并行循环。您可以将 OpenMP 线程与 C++11std::lock
/std::condition_variable
结合使用。虽然它在实践中可能有效,但它没有得到标准的正式支持。
注: 保护队列上的操作时,必须保护所有调用,包括g_queue.empty()
。