c-如何在没有联接的情况下同步管理器/工作程序pthreads



我熟悉多线程,并成功地用Java和Objective-C开发了许多多线程程序。但是,如果不使用主线程的连接,我就无法使用pthreads在C中实现以下目标:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_OF_THREADS 2
struct thread_data {
    int start;
    int end;
    int *arr;
};
void print(int *ints, int n);
void *processArray(void *args);
int main(int argc, const char * argv[])
{
    int numOfInts = 10;
    int *ints = malloc(numOfInts * sizeof(int));
    for (int i = 0; i < numOfInts; i++) {
        ints[i] = i;
    }
    print(ints, numOfInts); // prints [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    pthread_t threads[NUM_OF_THREADS];
    struct thread_data thread_data[NUM_OF_THREADS];
    // these vars are used to calculate the index ranges for each thread
    int remainingWork = numOfInts, amountOfWork;
    int startRange, endRange = -1;
    for (int i = 0; i < NUM_OF_THREADS; i++) {
        amountOfWork = remainingWork / (NUM_OF_THREADS - i);
        startRange = endRange + 1;
        endRange   = startRange + amountOfWork - 1;
        thread_data[i].arr   = ints;
        thread_data[i].start = startRange;
        thread_data[i].end   = endRange;
        pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);
        remainingWork -= amountOfWork;      
    }
    // 1. Signal to the threads to start working

    // 2. Wait for them to finish

    print(ints, numOfInts); // should print [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    free(ints);
    return 0;
}
void *processArray(void *args)
{
    struct thread_data *data = (struct thread_data *)args;
    int *arr  = data->arr;
    int start = data->start;
    int end   = data->end;
    // 1. Wait for a signal to start from the main thread

    for (int i = start; i <= end; i++) {
        arr[i] = arr[i] + 1;
    }
    // 2. Signal to the main thread that you're done
    pthread_exit(NULL);
}
void print(int *ints, int n)
{
    printf("[");
    for (int i = 0; i < n; i++) {
        printf("%d", ints[i]);
        if (i+1 != n)
            printf(", ");
    }
    printf("]n");
}

我想在上面的代码中实现以下内容:

在main()中:

  1. 向线程发出开始工作的信号
  2. 等待后台线程完成

处理中阵列():

  1. 等待信号从主线程启动
  2. 向主线发出完成的信号

我不想在主线程中使用联接,因为在实际应用程序中,主线程会创建一次线程,然后它会多次向后台线程发出工作信号,除非所有后台线程都完成了工作,否则我不能让主线程继续。在processArray函数中,我将放置一个无限循环,如下所示:

void *processArray(void *args)
{
    struct thread_data *data = (struct thread_data *)args;
    while (1)
    {
      // 1. Wait for a signal to start from the main thread
      int *arr  = data->arr;
      int start = data->start;
      int end   = data->end;          
      // Process
      for (int i = start; i <= end; i++) {
          arr[i] = arr[i] + 1;
      }
      // 2. Signal to the main thread that you're done
    }
    pthread_exit(NULL);
}

请注意,我是C和posix API的新手,所以如果我遗漏了一些明显的内容,请原谅。但我确实尝试了很多方法,从使用互斥锁和一系列信号量开始,以及两者的混合,但都没有成功。我认为条件变量可能会有所帮助,但我不明白如何使用它。

谢谢你抽出时间。

问题已解决:

非常感谢你们!通过遵循您的提示,我终于能够在不使用联接的情况下安全地工作。尽管这个解决方案有点难看,但它完成了任务,性能提升是值得的(如下所示)。对于任何感兴趣的人来说,这是我正在开发的真实应用程序的模拟,其中主线程不断地向后台线程提供工作:

 #include <stdio.h>
 #include <stdlib.h>
 #include <pthread.h>
 #define NUM_OF_THREADS 5
 struct thread_data {
     int id;
     int start;
     int end;
     int *arr;
 };
 pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  currentlyIdleCond  = PTHREAD_COND_INITIALIZER;
 int currentlyIdle;
 pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  workReadyCond  = PTHREAD_COND_INITIALIZER;
 int workReady;
 pthread_cond_t  currentlyWorkingCond = PTHREAD_COND_INITIALIZER;
 pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER;
 int currentlyWorking;
 pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  canFinishCond  = PTHREAD_COND_INITIALIZER;
 int canFinish;
 void print(int *ints, int n);
 void *processArray(void *args);
 int validateResult(int *ints, int num, int start);
 int main(int argc, const char * argv[])
 {
     int numOfInts = 10;
     int *ints = malloc(numOfInts * sizeof(int));
     for (int i = 0; i < numOfInts; i++) {
         ints[i] = i;
     }
 //   print(ints, numOfInts);
     pthread_t threads[NUM_OF_THREADS];
     struct thread_data thread_data[NUM_OF_THREADS];
     workReady = 0;
     canFinish = 0;
     currentlyIdle = 0;
     currentlyWorking = 0;
     // these vars are used to calculate the index ranges for each thread
     int remainingWork = numOfInts, amountOfWork;
     int startRange, endRange = -1;
     // Create the threads and give each one its data struct.
     for (int i = 0; i < NUM_OF_THREADS; i++) {
         amountOfWork = remainingWork / (NUM_OF_THREADS - i);
         startRange = endRange + 1;
         endRange   = startRange + amountOfWork - 1;
         thread_data[i].id    = i;
         thread_data[i].arr   = ints;
         thread_data[i].start = startRange;
         thread_data[i].end   = endRange;
         pthread_create(&threads[i], NULL, processArray, (void *)&thread_data[i]);
         remainingWork -= amountOfWork;
     }
     int loops = 1111111;
     int expectedStartingValue = ints[0] + loops; // used to validate the results
     // The elements in ints[] should be incremented by 1 in each loop
     while (loops-- != 0) {
         // Make sure all of them are ready
         pthread_mutex_lock(&currentlyIdleMutex);
         while (currentlyIdle != NUM_OF_THREADS) {
             pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex);
         }
         pthread_mutex_unlock(&currentlyIdleMutex);
         // All threads are now blocked; it's safe to not lock the mutex.
         // Prevent them from finishing before authorized.
         canFinish = 0;
         // Reset the number of currentlyWorking threads
         currentlyWorking = NUM_OF_THREADS;
         // Signal to the threads to start
         pthread_mutex_lock(&workReadyMutex);
         workReady = 1;
         pthread_cond_broadcast(&workReadyCond );
         pthread_mutex_unlock(&workReadyMutex);      
         // Wait for them to finish
         pthread_mutex_lock(&currentlyWorkingMutex);
         while (currentlyWorking != 0) {
             pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex);
         }
         pthread_mutex_unlock(&currentlyWorkingMutex);
         // The threads are now waiting for permission to finish
         // Prevent them from starting again
         workReady = 0;
         currentlyIdle = 0;
         // Allow them to finish
         pthread_mutex_lock(&canFinishMutex);
         canFinish = 1;
         pthread_cond_broadcast(&canFinishCond);
         pthread_mutex_unlock(&canFinishMutex);
     }
 //   print(ints, numOfInts);
     if (validateResult(ints, numOfInts, expectedStartingValue)) {
         printf("Result correct.n");
     }
     else {
         printf("Result invalid.n");      
     }
     // clean up
     for (int i = 0; i < NUM_OF_THREADS; i++) {
         pthread_cancel(threads[i]);
     }
     free(ints);
     return 0;
 }
 void *processArray(void *args)
 {
     struct thread_data *data = (struct thread_data *)args;
     int *arr  = data->arr;
     int start = data->start;
     int end   = data->end;
     while (1) {
         // Set yourself as idle and signal to the main thread, when all threads are idle main will start
         pthread_mutex_lock(&currentlyIdleMutex);
         currentlyIdle++;
         pthread_cond_signal(&currentlyIdleCond);
         pthread_mutex_unlock(&currentlyIdleMutex);
         // wait for work from main
         pthread_mutex_lock(&workReadyMutex);
         while (!workReady) {
             pthread_cond_wait(&workReadyCond , &workReadyMutex);
         }
         pthread_mutex_unlock(&workReadyMutex);
         // Do the work
         for (int i = start; i <= end; i++) {
             arr[i] = arr[i] + 1;
         }
         // mark yourself as finished and signal to main
         pthread_mutex_lock(&currentlyWorkingMutex);
         currentlyWorking--;
         pthread_cond_signal(&currentlyWorkingCond);
         pthread_mutex_unlock(&currentlyWorkingMutex);
         // Wait for permission to finish
         pthread_mutex_lock(&canFinishMutex);
         while (!canFinish) {
             pthread_cond_wait(&canFinishCond , &canFinishMutex);
         }
         pthread_mutex_unlock(&canFinishMutex);
     }
     pthread_exit(NULL);
 }
 int validateResult(int *ints, int n, int start)
 {
     int tmp = start;
     for (int i = 0; i < n; i++, tmp++) {
         if (ints[i] != tmp) {
             return 0;
         }
     }
     return 1;
 }
 void print(int *ints, int n)
 {
     printf("[");
     for (int i = 0; i < n; i++) {
         printf("%d", ints[i]);
         if (i+1 != n)
             printf(", ");
     }
     printf("]n");
 }

但我不确定pthread_cancel是否足够清理!至于屏障,如果它不局限于@Jeremy提到的一些操作系统,那将是一个很大的帮助。

基准:

我想确保这些条件实际上不会减慢算法的速度,所以我设置了这个基准来比较两种解决方案:

 #include <stdio.h>
 #include <stdlib.h>
 #include <pthread.h>
 #include <unistd.h>
 #include <sys/time.h>
 #include <sys/resource.h>
 #define NUM_OF_THREADS 5
 struct thread_data {
     int start;
     int end;
     int *arr;
 };
 pthread_mutex_t currentlyIdleMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  currentlyIdleCond  = PTHREAD_COND_INITIALIZER;
 int currentlyIdle;
 pthread_mutex_t workReadyMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  workReadyCond  = PTHREAD_COND_INITIALIZER;
 int workReady;
 pthread_cond_t  currentlyWorkingCond = PTHREAD_COND_INITIALIZER;
 pthread_mutex_t currentlyWorkingMutex= PTHREAD_MUTEX_INITIALIZER;
 int currentlyWorking;
 pthread_mutex_t canFinishMutex = PTHREAD_MUTEX_INITIALIZER;
 pthread_cond_t  canFinishCond  = PTHREAD_COND_INITIALIZER;
 int canFinish;
 void *processArrayMutex(void *args);
 void *processArrayJoin(void *args);
 double doItWithMutex(pthread_t *threads, struct thread_data *data, int loops);
 double doItWithJoin(pthread_t *threads, struct thread_data *data, int loops);
 int main(int argc, const char * argv[])
 {
     int numOfInts = 10;
     int *join_ints = malloc(numOfInts * sizeof(int));
     int *mutex_ints = malloc(numOfInts * sizeof(int));
     for (int i = 0; i < numOfInts; i++) {
         join_ints[i] = i;
         mutex_ints[i] = i;
     }
     pthread_t join_threads[NUM_OF_THREADS];
     pthread_t mutex_threads[NUM_OF_THREADS];
     struct thread_data join_thread_data[NUM_OF_THREADS];
     struct thread_data mutex_thread_data[NUM_OF_THREADS];
     workReady = 0;
     canFinish = 0;
     currentlyIdle = 0;
     currentlyWorking = 0;
     int remainingWork = numOfInts, amountOfWork;
     int startRange, endRange = -1;
     for (int i = 0; i < NUM_OF_THREADS; i++) {
         amountOfWork = remainingWork / (NUM_OF_THREADS - i);
         startRange = endRange + 1;
         endRange   = startRange + amountOfWork - 1;
         join_thread_data[i].arr   = join_ints;
         join_thread_data[i].start = startRange;
         join_thread_data[i].end   = endRange;
         mutex_thread_data[i].arr   = mutex_ints;
         mutex_thread_data[i].start = startRange;
         mutex_thread_data[i].end   = endRange;
         pthread_create(&mutex_threads[i], NULL, processArrayMutex, (void *)&mutex_thread_data[i]);
         remainingWork -= amountOfWork;
     }
     int numOfBenchmarkTests = 100;
     int numberOfLoopsPerTest= 1000;
     double join_sum = 0.0, mutex_sum = 0.0;
     for (int i = 0; i < numOfBenchmarkTests; i++)
     {
         double joinTime = doItWithJoin(join_threads, join_thread_data, numberOfLoopsPerTest);
         double mutexTime= doItWithMutex(mutex_threads, mutex_thread_data, numberOfLoopsPerTest);
         join_sum += joinTime;
         mutex_sum+= mutexTime;      
     }
     double join_avg = join_sum / numOfBenchmarkTests;
     double mutex_avg= mutex_sum / numOfBenchmarkTests;
     printf("Join average : %fn", join_avg);
     printf("Mutex average: %fn", mutex_avg);
     double diff = join_avg - mutex_avg;
     if (diff > 0.0)
         printf("Mutex is %.0f%% faster.n", 100 * diff / join_avg);
     else if (diff < 0.0)
         printf("Join  is %.0f%% faster.n", 100 * diff / mutex_avg);
     else
         printf("Both have the same performance.");
     free(join_ints);
     free(mutex_ints);
     return 0;
 }
 // From https://stackoverflow.com/a/2349941/408286
 double get_time()
 {
     struct timeval t;
     struct timezone tzp;
     gettimeofday(&t, &tzp);
     return t.tv_sec + t.tv_usec*1e-6;
 }
 double doItWithMutex(pthread_t *threads, struct thread_data *data, int num_loops)
 {
     double start = get_time();
     int loops = num_loops;
     while (loops-- != 0) {
         // Make sure all of them are ready
         pthread_mutex_lock(&currentlyIdleMutex);
         while (currentlyIdle != NUM_OF_THREADS) {
             pthread_cond_wait(&currentlyIdleCond, &currentlyIdleMutex);
         }
         pthread_mutex_unlock(&currentlyIdleMutex);
         // All threads are now blocked; it's safe to not lock the mutex.
         // Prevent them from finishing before authorized.
         canFinish = 0;
         // Reset the number of currentlyWorking threads
         currentlyWorking = NUM_OF_THREADS;
         // Signal to the threads to start
         pthread_mutex_lock(&workReadyMutex);
         workReady = 1;
         pthread_cond_broadcast(&workReadyCond );
         pthread_mutex_unlock(&workReadyMutex);
         // Wait for them to finish
         pthread_mutex_lock(&currentlyWorkingMutex);
         while (currentlyWorking != 0) {
             pthread_cond_wait(&currentlyWorkingCond, &currentlyWorkingMutex);
         }
         pthread_mutex_unlock(&currentlyWorkingMutex);
         // The threads are now waiting for permission to finish
         // Prevent them from starting again
         workReady = 0;
         currentlyIdle = 0;
         // Allow them to finish
         pthread_mutex_lock(&canFinishMutex);
         canFinish = 1;
         pthread_cond_broadcast(&canFinishCond);
         pthread_mutex_unlock(&canFinishMutex);
     }
     return get_time() - start;
 }
 double doItWithJoin(pthread_t *threads, struct thread_data *data, int num_loops)
 {
     double start = get_time();
     int loops = num_loops;
     while (loops-- != 0) {
         // create them
         for (int i = 0; i < NUM_OF_THREADS; i++) {
             pthread_create(&threads[i], NULL, processArrayJoin, (void *)&data[i]);
         }
         // wait
         for (int i = 0; i < NUM_OF_THREADS; i++) {
             pthread_join(threads[i], NULL);
         }
     }
     return get_time() - start;
 }
 void *processArrayMutex(void *args)
 {
     struct thread_data *data = (struct thread_data *)args;
     int *arr  = data->arr;
     int start = data->start;
     int end   = data->end;
     while (1) {
         // Set yourself as idle and signal to the main thread, when all threads are idle main will start
         pthread_mutex_lock(&currentlyIdleMutex);
         currentlyIdle++;
         pthread_cond_signal(&currentlyIdleCond);
         pthread_mutex_unlock(&currentlyIdleMutex);
         // wait for work from main
         pthread_mutex_lock(&workReadyMutex);
         while (!workReady) {
             pthread_cond_wait(&workReadyCond , &workReadyMutex);
         }
         pthread_mutex_unlock(&workReadyMutex);
         // Do the work
         for (int i = start; i <= end; i++) {
             arr[i] = arr[i] + 1;
         }
         // mark yourself as finished and signal to main
         pthread_mutex_lock(&currentlyWorkingMutex);
         currentlyWorking--;
         pthread_cond_signal(&currentlyWorkingCond);
         pthread_mutex_unlock(&currentlyWorkingMutex);
         // Wait for permission to finish
         pthread_mutex_lock(&canFinishMutex);
         while (!canFinish) {
             pthread_cond_wait(&canFinishCond , &canFinishMutex);
         }
         pthread_mutex_unlock(&canFinishMutex);
     }
     pthread_exit(NULL);
 }
 void *processArrayJoin(void *args)
 {
     struct thread_data *data = (struct thread_data *)args;
     int *arr  = data->arr;
     int start = data->start;
     int end   = data->end;
     // Do the work
     for (int i = start; i <= end; i++) {
         arr[i] = arr[i] + 1;
     }
     pthread_exit(NULL);
 }

输出为:

Join average : 0.153074
Mutex average: 0.071588
Mutex is 53% faster.

再次感谢。我真的很感谢你的帮助!

您可以使用几种同步机制(例如,条件变量)。我认为最简单的方法是使用pthread_barrier来同步线程的开始。

假设您希望所有线程在每次循环迭代中"同步",那么您可以重用屏障。如果您需要更灵活的东西,条件变量可能更合适。

当您决定线程结束时(您还没有指出线程将如何知道如何突破无限循环——可以使用一个简单的共享变量;共享变量可以是原子类型或受互斥体保护),main()线程应该使用pthread_join()来等待所有线程完成。

您需要使用与join不同的同步技术,这一点很清楚。

不幸的是,你有很多选择。一个是"同步屏障",基本上是指每个到达它的线程都会阻塞,直到它们都到达为止(您可以提前指定线程数)。看看pthread_barrier

另一种是使用条件变量/互斥对(pthread_cond_*)。当每个线程完成时,它获取互斥,增加一个计数,并向condvar发出信号。主线程在condvar上等待,直到计数达到它期望的值。代码如下:

// thread has finished
mutex_lock
++global_count
// optional optimization: only execute the next line when global_count >= N
cond_signal
mutex_unlock
// main is waiting for N threads to finish
mutex_lock
while (global_count < N) {
    cond_wait
}
mutex_unlock

另一种是每个线程使用一个信号量——当线程完成时,它发布自己的信号量,主线程依次等待每个信号量,而不是依次加入每个线程。

您还需要同步来重新启动下一个作业的线程——这可能是与第一个类型相同的第二个同步对象,但由于您有1个发布者和N个等待者,而不是相反,因此更改了详细信息。或者,您可以(小心地)将同一个对象用于两个目的。

如果你尝试过这些东西,但你的代码不起作用,也许可以问一个关于你尝试过的代码的新的特定问题。他们都能胜任这项任务。

您在错误的抽象级别上工作。这个问题已经解决了。您正在重新实现一个工作队列+线程池。

OpenMP似乎很适合您的问题。它将#pragma注释转换为线程代码。我相信这会让你非常直接地表达你想要做的事情。

使用libdispatch,您试图做的事情将表示为针对并发队列的dispatch_apply。这隐含地等待所有子任务完成。在OSX下,它是使用不可移植的pthread工作队列接口实现的;在FreeBSD下,我相信它直接管理一组pthread。

如果是可移植性问题驱使您使用原始pthread,那么不要使用pthread屏障。Barriers是在基本POSIX线程之上的一个附加扩展。例如,OSX不支持它。有关更多信息,请参阅POSIX。

阻塞主线程直到所有子线程都完成,可以使用由条件变量保护的计数来完成,或者更简单地说,使用管道和阻塞读取来完成,其中要读取的字节数与线程数匹配。每个线程在工作完成时写入一个字节,然后休眠,直到从主线程获得新的工作。一旦每个线程都写入了"我完成了!"字节,主线程就会解除锁定。

将工作传递给子线程可以使用互斥来保护工作描述符和发出新工作信号的条件。您可以使用所有线程都从中提取的单个工作描述符数组。在发出信号时,每个线程都试图获取互斥对象。在获取互斥体时,它会将一些工作出列,如果队列不空,则重新发出信号,然后处理其工作,然后向主线程发出完成信号。

您可以重用这个"工作队列",通过对结果进行排队来解除对主线程的阻塞,让主线程等待,直到结果队列长度与线程数相匹配;管道方法只是使用一个阻塞CCD_ 11来为您进行计数。

要告诉所有线程开始工作,可以像初始化为零的全局整数变量一样简单,线程只需等待它为非零。这样就不需要在线程函数中使用while (1)循环。

对于等待直到它们全部完成,pthread_join是最简单的,因为它实际上会阻塞,直到它所连接的线程完成。它还需要在线程之后清理系统内容(就像其他情况一样,线程的返回值将为程序的剩余部分存储)。由于线程有一个包含所有pthread_t的数组,所以只需逐个循环即可。由于程序的这一部分不做任何其他事情,并且必须等到所有线程都完成,所以按顺序等待它们是可以的。

相关内容

最新更新