我可以在等待/信号量中切换测试和修改部分吗?



wait()signal()信号量的经典none-busy-waiting版本实现如下:在这个版本中,value可以是负的。

//primitive
wait(semaphore* S)
{
    S->value--;
    if (S->value < 0)
    {
        add this process to S->list;
        block();
    }
}
//primitive
signal(semaphore* S)
{
    S->value++;
    if (S->value <= 0)
    {
        remove a process P from S->list;
        wakeup(P);
    }
}

问题:以下版本是否也正确?这里我先测试并修改值。如果你能给我展示一个它不起作用的场景,那就太好了。

//primitive wait().
//If (S->value > 0), the whole function is atomic
//otherise, only if(){} section is atomic
wait(semaphore* S)
{
    if (S->value <= 0)
    {
        add this process to S->list;
        block();
    }
    // here I decrement the value after the previous test and possible blocking
    S->value--;
}
//similar to wait()
signal(semaphore* S)
{
    if (S->list is not empty)
    {
        remove a process P from S->list;
        wakeup(P);
    }
    // here I increment the value after the previous test and possible waking up
    S->value++;
}

编辑:

我的动机是想弄清楚我是否可以使用后一个版本来实现互斥,并且没有死锁,没有饥饿

修改后的版本引入了一个竞争条件:

  • 线程A: if(S->Value <0)//值= 1
  • 线程B: if(S->Value <0)//值= 1
  • 线程A: S->值——;//值= 0
  • 线程B: S->值——;//值= -1

两个线程都获得了count=1信号量。哦。请注意,即使它们是不可抢占的(见下文),也存在另一个问题,但为了完整性,这里将讨论原子性以及真实锁定协议如何工作。

在处理这样的协议时,确定要使用的原子原语是非常重要的。原子原语是这样的,它们似乎是即时执行的,而不会与任何其他操作交叉。你不能把一个大函数叫做原子函数;你必须以某种方式使成为原子,使用其他原子原语。

大多数cpu都提供了一个叫做"原子比较和交换"的原语。从这里开始,我将缩写为compxchg。语义如下:

bool cmpxchg(long *ptr, long old, long new) {
    if (*ptr == old) {
        *ptr = new;
        return true;
    } else {
        return false;
    }
}

cmpxchg不是用这个代码实现的。它在CPU硬件中,但行为有点像这样,只是在原子上。

现在,让我们添加一些额外的有用函数(基于其他原语构建):
  • add_waitqueue(waitqueue) -将我们的进程状态设置为睡眠并将我们添加到等待队列中,但是继续执行 (ATOMIC)
  • schedule() -切换线程。如果我们处于睡眠状态,我们不会再次运行,直到唤醒(阻塞)
  • remove_waitqueue(waitqueue) -将我们的进程从等待队列中移除,然后将我们的状态设置为唤醒,如果它还没有(ATOMIC)
  • memory_barrier() -确保在点之前逻辑上的任何读/写实际上在点之前执行,避免令人讨厌的内存排序问题(我们假设所有其他原子原语都带有自由内存屏障,尽管这并不总是正确的)(CPU/COMPILER PRIMITIVE)
下面是一个典型的信号量获取例程。它比您的示例要复杂一些,因为我已经明确地确定了我使用的原子操作:
void sem_down(sem *pSem)
{
    while (1) {
        long spec_count = pSem->count;
        read_memory_barrier(); // make sure spec_count doesn't start changing on us! pSem->count may keep changing though
        if (spec_count > 0)
        {
            if (cmpxchg(&pSem->count, spec_count, spec_count - 1)) // ATOMIC
                return; // got the semaphore without blocking
            else
                continue; // count is stale, try again
        } else { // semaphore count is zero
            add_waitqueue(pSem->wqueue); // ATOMIC
            // recheck the semaphore count, now that we're in the waitqueue - it may have changed
            if (pSem->count == 0) schedule(); // NOT ATOMIC
            remove_waitqueue(pSem->wqueue); // ATOMIC
            // loop around again to try to acquire the semaphore
        }
    }
}

您将注意到,在实际的semaphore_down函数中,对非零pSem->count的实际测试是由cmpxchg完成的。你不能相信任何其他人;该值可以在读取值后立即更改。我们根本无法将值检查和值修改分开。

这里的spec_count投机性的。这很重要。我实际上是在猜测计数会是多少。这是一个很好的猜测,但这只是猜测。如果我的猜测是错误的,cmpxchg将失败,此时例程必须循环并再次尝试。如果我猜是0,那么我要么会被唤醒(因为当我在等待队列中时它不再是0),要么我会注意到在调度测试中它不再是0了。

您还应该注意,不可能使包含阻塞操作的函数成为原子的。这是荒谬的。根据定义,原子函数似乎是立即执行的,不与其他任何东西交叉。但是一个阻塞函数,根据定义,等待其他事情发生。这是不一致的。同样,没有原子操作可以在阻塞操作中被"拆分",就像在你的例子中一样。

现在,您可以通过声明函数不可抢占来消除这种复杂性。通过使用锁或其他方法,您只需确保一次只有一个线程在信号量代码中运行(当然不包括阻塞)。但问题仍然存在。从值0开始,其中C已将信号量取下两次,然后:

  • 线程A: if (S->Value <0)//值= 0
  • 线程A:阻塞....
  • 线程B: if (S->Value <0)//值= 0
  • 线程B:阻塞....
  • 线程C: S->Value++//Value = 1
  • 线程C:唤醒(A)
  • (线程C再次调用signal())
  • 线程C: S->Value++//Value = 2
  • 线程C:唤醒(B)
  • (线程C调用wait())线程C: if (S->Value <= 0)//Value = 2
  • 线程C: S->Value——//Value = 1
  • //A和B已被唤醒
  • 线程A: S->值——//值= 0
  • 线程B: S->Value——//Value = -1

你可以用一个循环来重新检查S->value -来修复这个问题,假设你在一个单处理器机器上,你的信号量代码是可抢占的。不幸的是,这些假设在所有桌面操作系统上都是错误的:)

关于真正的锁协议如何工作的更多讨论,您可能对论文"Fuss, Futexes和fuwocks: Linux中的快速用户级锁定"感兴趣

最新更新