wait()
和signal()
信号量的经典none-busy-waiting
版本实现如下:在这个版本中,value
可以是负的。
//primitive
wait(semaphore* S)
{
S->value--;
if (S->value < 0)
{
add this process to S->list;
block();
}
}
//primitive
signal(semaphore* S)
{
S->value++;
if (S->value <= 0)
{
remove a process P from S->list;
wakeup(P);
}
}
问题:以下版本是否也正确?这里我先测试并修改值。如果你能给我展示一个它不起作用的场景,那就太好了。
//primitive wait().
//If (S->value > 0), the whole function is atomic
//otherise, only if(){} section is atomic
wait(semaphore* S)
{
if (S->value <= 0)
{
add this process to S->list;
block();
}
// here I decrement the value after the previous test and possible blocking
S->value--;
}
//similar to wait()
signal(semaphore* S)
{
if (S->list is not empty)
{
remove a process P from S->list;
wakeup(P);
}
// here I increment the value after the previous test and possible waking up
S->value++;
}
编辑:
我的动机是想弄清楚我是否可以使用后一个版本来实现互斥,并且没有死锁,没有饥饿
修改后的版本引入了一个竞争条件:
- 线程A: if(S->Value <0)//值= 1
- 线程B: if(S->Value <0)//值= 1
- 线程A: S->值——;//值= 0
- 线程B: S->值——;//值= -1
两个线程都获得了count=1信号量。哦。请注意,即使它们是不可抢占的(见下文),也存在另一个问题,但为了完整性,这里将讨论原子性以及真实锁定协议如何工作。
在处理这样的协议时,确定要使用的原子原语是非常重要的。原子原语是这样的,它们似乎是即时执行的,而不会与任何其他操作交叉。你不能把一个大函数叫做原子函数;你必须以某种方式使成为原子,使用其他原子原语。
大多数cpu都提供了一个叫做"原子比较和交换"的原语。从这里开始,我将缩写为compxchg。语义如下:
bool cmpxchg(long *ptr, long old, long new) {
if (*ptr == old) {
*ptr = new;
return true;
} else {
return false;
}
}
cmpxchg
不是用这个代码实现的。它在CPU硬件中,但行为有点像这样,只是在原子上。
- add_waitqueue(waitqueue) -将我们的进程状态设置为睡眠并将我们添加到等待队列中,但是继续执行 (ATOMIC)
- schedule() -切换线程。如果我们处于睡眠状态,我们不会再次运行,直到唤醒(阻塞)
- remove_waitqueue(waitqueue) -将我们的进程从等待队列中移除,然后将我们的状态设置为唤醒,如果它还没有(ATOMIC)
- memory_barrier() -确保在点之前逻辑上的任何读/写实际上在点之前执行,避免令人讨厌的内存排序问题(我们假设所有其他原子原语都带有自由内存屏障,尽管这并不总是正确的)(CPU/COMPILER PRIMITIVE)
void sem_down(sem *pSem)
{
while (1) {
long spec_count = pSem->count;
read_memory_barrier(); // make sure spec_count doesn't start changing on us! pSem->count may keep changing though
if (spec_count > 0)
{
if (cmpxchg(&pSem->count, spec_count, spec_count - 1)) // ATOMIC
return; // got the semaphore without blocking
else
continue; // count is stale, try again
} else { // semaphore count is zero
add_waitqueue(pSem->wqueue); // ATOMIC
// recheck the semaphore count, now that we're in the waitqueue - it may have changed
if (pSem->count == 0) schedule(); // NOT ATOMIC
remove_waitqueue(pSem->wqueue); // ATOMIC
// loop around again to try to acquire the semaphore
}
}
}
您将注意到,在实际的semaphore_down函数中,对非零pSem->count
的实际测试是由cmpxchg
完成的。你不能相信任何其他人;该值可以在读取值后立即更改。我们根本无法将值检查和值修改分开。
这里的spec_count
是投机性的。这很重要。我实际上是在猜测计数会是多少。这是一个很好的猜测,但这只是猜测。如果我的猜测是错误的,cmpxchg
将失败,此时例程必须循环并再次尝试。如果我猜是0,那么我要么会被唤醒(因为当我在等待队列中时它不再是0),要么我会注意到在调度测试中它不再是0了。
您还应该注意,不可能使包含阻塞操作的函数成为原子的。这是荒谬的。根据定义,原子函数似乎是立即执行的,不与其他任何东西交叉。但是一个阻塞函数,根据定义,等待其他事情发生。这是不一致的。同样,没有原子操作可以在阻塞操作中被"拆分",就像在你的例子中一样。
现在,您可以通过声明函数不可抢占来消除这种复杂性。通过使用锁或其他方法,您只需确保一次只有一个线程在信号量代码中运行(当然不包括阻塞)。但问题仍然存在。从值0开始,其中C已将信号量取下两次,然后:
- 线程A: if (S->Value <0)//值= 0
- 线程A:阻塞....
- 线程B: if (S->Value <0)//值= 0
- 线程B:阻塞....
- 线程C: S->Value++//Value = 1
- 线程C:唤醒(A)
- (线程C再次调用signal())
- 线程C: S->Value++//Value = 2
- 线程C:唤醒(B) (线程C调用wait())线程C: if (S->Value <= 0)//Value = 2
- 线程C: S->Value——//Value = 1
- //A和B已被唤醒
- 线程A: S->值——//值= 0
- 线程B: S->Value——//Value = -1
你可以用一个循环来重新检查S->value -来修复这个问题,假设你在一个单处理器机器上,你的信号量代码是可抢占的。不幸的是,这些假设在所有桌面操作系统上都是错误的:)
关于真正的锁协议如何工作的更多讨论,您可能对论文"Fuss, Futexes和fuwocks: Linux中的快速用户级锁定"感兴趣