c - Pthreads,试图使用信号量从多个线程中打印出一段数字



我遇到的问题是:我有我的"父"通过一个for循环,从0到4。在每次迭代中,我有3个线程,我想打印出它们的12个数字(0到11)的"段"。

例如,当父循环为0时,线程0将打印0,线程1将打印4,线程2将打印8。

当父循环为1时,线程0将打印1,线程1将打印5,线程2将打印9。

所以理想的输出是0 1 2 3 4 5 6 7 8 9 10 11。我知道您无法确定线程何时运行,因此它不会完全按照这个顺序运行,但我至少希望在线程打印3 4或5之前打印0 1和2。

这就是我有问题的地方。似乎有一个线程或另一个线程被遗忘了,没有打印出它的段,或者没有和其他线程一起完全打印出它的段。这是我正在做的一个问题,试图完全理解信号量。

我有两个信号量,一个在线程工作时阻塞父线程(生产者?),另一个阻塞每个线程,直到生产者增加到下一个索引。我以为这样我就可以迫使他们等待对方完成后再继续,但由于某些原因,我遇到了问题。

下面是我的代码:

#include <semaphore.h>
#include <stdio.h>
#include <pthread.h>
#define maxNum 12
sem_t parentSem;
sem_t threadSem;
int finishedThreads;
int done = 0;
int stuff[12];
int i;
void* threadFunction(int m){
    int printNum;
    int baseNum;
    //Determine the value the thread should start at
    baseNum = (double)(maxNum/3) * m;
    while(!done){ //ensure thread doesn't exit before parent is done with whole loop
        //wait for parent to increment
        sem_wait(&threadSem);
        printNum = baseNum + i;
            //keep track of how many threads are finished to let parent continue
        finishedThreads++;
        if(finishedThreads == 3){
                    //let parent continue if all threads are finished
            sem_post(&parentSem);
        }
    }
}
int main(int argc, char** argv[]){
    sem_init(&parentSem, 0, 1);
    sem_init(&threadSem, 0, 0);
    int rc;
    pthread_t threads[3];
    int l; 
    for(l = 0; l < 12; l++){
        stuff[l] = l;
    }
    int j;
    for(j = 0; j < 3; j++){
        rc = pthread_create(&threads[i], NULL, threadFunction, (void*) j);
    }
    int k;
    for(i = 0; i < 4; i++){
        sem_wait(&parentSem); //wait for children here (initially sem set to 1)
        finishedThreads = 0; //set finished thread counter to 0
        for(k = 0; k < 3; k++){
            //increment thread semaphore to 3 so each thread can run
            sem_post(&threadSem);
        }
    }
    for(i = 1; i < 3; i++){
        pthread_join(threads[i], NULL);
    }

}

如何确保所有线程在父线程递增之前运行?我怎么能确保所有线程运行每"轮"没有任何被卡在后面?有时同一个线程运行两次,而不是每个线程运行一次.....帮助吗?

非常感谢你们的帮助。

编辑:新代码状态:(线程函数)

while(!done){
    printf("Thread %d not done yet...n", m);
    if(m == 0){
        sem_wait(&threadSem0);
    }else if(m == 1){
        sem_wait(&threadSem1);
    }else if(m == 2){
        sem_wait(&threadSem2);
    }
    printNum = baseNum + i;
    printf("Thread %d past waiting, number segment: %dn", m, printNum);
    finishedThreads++;
    if(finishedThreads == 3){
        sem_post(&parentSem);
    }
}

家长部分:

for(i = 0; i < 4; i++){
    printf("In parent for loop, counter: %dn", i);
    printf("Parent past wait semaphoren");
    finishedThreads = 0;
    if(i == 3) done = 1;
    sem_post(&threadSem0);
    sem_post(&threadSem1);
    sem_post(&threadSem2);
    sem_wait(&parentSem);
}
for(i = 1; i < 3; i++){
    pthread_join(threads[i], NULL);
}

您需要对finishedThreads进行原子访问,以避免每个线程只使用一个缓存副本。您的程序在没有优化(*)的情况下编译时工作吗?

在c++中正确的方法是使用std::atomic。请看这个问题的完整答案。

更新:

如果线程正在快速循环,就像您的情况一样,为所有子线程使用单个信号量可能会产生问题。你必须为每个子线程使用一个信号量来解决这个问题。

sem_t threadSem[3];
// ...
sem_wait(&threadSem+m);
// ...
for (i = 0; i < 3; i++)
    sem_init(threadSem+i, 0, 0);
// ...
    for(k = 0; k < 3; k++){
        //increment each thread semaphore so each thread can run
        sem_post(threadSem+k);
    }

还有一些我之前没有注意到的事情:

您使用变量i来增加threadFunction。这行不通。正确的公式应该只取决于m。由于每个线程都有自己的baseNumprintNum副本,因此可以毫无问题地使用它们:

baseNum = (double)(maxNum/3) * m;
while(!done){ //ensure thread doesn't exit before parent is done with whole loop
    //wait for parent to increment
    sem_wait(&threadSem);
    printNum++;
    // ....

你的代码中有很多东西取决于子线程的数量,我建议使用一个常数(就像你为maxNum所做的那样)。

您不设置变量done来指示子线程的工作结束。

volatile int done;
// in threadFunction
 // wait for start
sem_wait(threadSem+m)
while(!done){ 
    // thread work here
    // ...
    //wait for parent to increment at the end of the loop
    sem_wait(threadSem+m);
}
// in main
for(k = 0; k < 3; k++){
    //increment each thread semaphore so each thread can run
    if (i == 4) done = 1;
    sem_post(threadSem+k);
}

当然done也必须遵循" volatile或互斥"规则。

你不把所有的线程连接到最后:

for(i = 0; i < 3; i++){
    pthread_join(threads[i], NULL);
}

(*)如果您正在使用gcc,我的意思是使用参数-O0,甚至使用-g调试

您需要一个屏障,特别是可重用的屏障。看一看Allen Downey的《信号量小书》3.6.7节,可能会有帮助,转载在这里(PDF)。代码是用Python编写的,但它的要点很清楚。它需要一个互斥锁和两个计数信号量,在您的例子中,barrier中有4个参与者。

你可以通过设置线程优先级来控制哪个线程首先被唤醒,这样第一个线程是最重要的,等等

最新更新