在C上用pthreads实现barrier



我正在尝试并行化合并排序算法。我所做的是为每个线程划分输入数组,然后合并线程结果。我试图合并结果的方式是这样的:

thread 0                     |   thread 1        |   thread 2         |   thread 3
sort(A0)                     |   sort(A1)        |   sort(A2)         | sort(A3)
merge(A0,A1)                 |                   |   merge(A2,A3)     | 
merge(A0A1, A2A3)            |                   |                    |

因此,在函数sortManager的末尾,我调用应该实现上述逻辑的函数mergeThreadResults。在它中,我遍历对以合并相应的线程。然后,如果需要,我将最后的项目合并到线程0上。它看起来像这样:

void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread;
int iter = 2;
while (iter <= threads) {
int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %dn", myRank, nextThread);

if (myRank % iter != 0) {
break;
}
merge(sortingArray, myLeft, myRight, nextThreadRight);
sleep(3); // <- sleep
myRight = nextThreadRight;
iter = iter * 2;
}
if (myRank == 0 && nextThread < threads-1) {
int nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}

它似乎按预期工作。问题是,我使用sleep函数来同步线程,这远不是最好的方法。因此,我试图用pthread实现一个屏障
在它中,我试图计算该周期需要多少次迭代,并将其作为breakpoint传递。当所有线程都在同一点时,我释放merge函数,并在新周期中再次等待。这就是我尝试过的:

pthread_mutex_lock(&mutex);
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(&cond_var);
} else {
while (pthread_cond_wait(&cond_var, &mutex) != 0);
}
pthread_mutex_unlock(&mutex);

但它并没有按预期工作。在最后一个周期完全结束之前,一些merge触发器,给我留下了一个部分排序的数组。

这是我的测试代码的一个小例子:

#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include <unistd.h>
// Initialize global variables
int sortingArray[20] = {5,-4,3,-1,-2,3,1,2,-2,-1,-2,-1,-2,-3,4,1234,534,123,87,123};
int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;
struct ThreadTask {
long rank;
int size;
int threads;
};
void merge(int arr[], int left, int mid, int right) {
/* Merge arrays */
int i, j, k;
int n1 = mid - left + 1;
int n2 = right - mid;
// Alocate temp arrays
int *L = malloc((n1 + 2) * sizeof(int));
int *R = malloc((n2 + 2) * sizeof(int));
if (L == NULL || R == NULL) {
fprintf(stderr, "Fatal: failed to allocate memory fo temp arrays.");
exit(EXIT_FAILURE);
}
// Populate temp arrays
for (i = 1; i <= n1; i++) {
L[i] = arr[left + i - 1];
}
for (j = 1; j <= n2; j++) {
R[j] = arr[mid + j];
}
L[n1 + 1] = INT_MAX;
R[n2 + 1] = INT_MAX;
i = 1;
j = 1;
// Merge arrays
for (k = left; k <= right; k++) {
if (L[i] <= R[j]) {
arr[k] = L[i];
i++;
} else {
arr[k] = R[j];
j++;
}
}
free(L);
free(R);
}

void mergeSort(int arr[], int left, int right) {
/* Sort and then merge arrays */
if (left < right) {
int mid = left + (right - left) / 2;
mergeSort(arr, left, mid);
mergeSort(arr, mid + 1, right);
merge(arr, left, mid, right);
}
}

void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread;
int iter = 2;
while (iter <= threads) {
int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %dn", myRank, nextThread);

if (myRank % iter != 0) {
break;
}
// barrier
pthread_mutex_lock(&mutex);
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(&cond_var);
} else {
while (pthread_cond_wait(&cond_var, &mutex) != 0);
}
pthread_mutex_unlock(&mutex);
merge(sortingArray, myLeft, myRight, nextThreadRight);
sleep(2); // <- sleep
myRight = nextThreadRight;
iter = iter * 2;
}
if (myRank == 0 && nextThread < threads-1) {
int nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
void *sortManager(void *threadInfo) {
/* Manage mergeSort between threads */
struct ThreadTask *currentTask = threadInfo;
// Get task arguments
long rank = currentTask->rank;
int left= rank * ((float)currentTask->size / (float)currentTask->threads);
int right = (rank + 1) * ((float)currentTask->size / (float)currentTask->threads) - 1;
int mid = left + (right - left) / 2;
// Execute merge for task division
if (left < right) {
mergeSort(sortingArray, left, mid);
mergeSort(sortingArray, mid + 1, right);
merge(sortingArray, left, mid, right);
}
// Merge thread results
if (rank % 2 == 0)  {
mergeThreadResults(rank, left, right, currentTask->size, currentTask->threads);
}
return 0;
}

struct ThreadTask *threadCreator(int size, int threads, pthread_t *thread_handles, struct ThreadTask *tasksHolder) {
/* Create threads with each task info */
struct ThreadTask *threadTask;
for (long thread = 0; thread < threads; thread++){
threadTask = &tasksHolder[thread];
threadTask->rank = thread;
threadTask->size = size;
threadTask->threads = threads;
pthread_create(&thread_handles[thread], NULL, sortManager, (void*) threadTask);
}
return tasksHolder;
}

void printArray(int arr[], int size) {
/* Print array */
for (int arrayIndex = 0; arrayIndex < size; arrayIndex++)
printf("%d ", arr[arrayIndex]);
printf("n");
}

int main(int argc, char *argv[]) {
// Initialize arguments
int arraySize = 20;
int totalThreads = 16;

// Display input
printf("nInput array:n");
printArray(sortingArray, arraySize);

// Initialize threads
pthread_t *thread_handles;
thread_handles = malloc(totalThreads * sizeof(pthread_t));
// Create threads
struct ThreadTask threadTasksHolder[totalThreads];
*threadTasksHolder = *threadCreator(arraySize, totalThreads, thread_handles, threadTasksHolder);

// Execute merge sort in each thread
for (long thread = 0; thread < totalThreads; thread++) {
pthread_join(thread_handles[thread], NULL);
}
free(thread_handles);

// Display output
printf("nSorted array:n");
printArray(sortingArray, arraySize);

return 0;
}

正如@John Bollinger所说,您的方法看起来不必要地困难,解决方案也同样复杂。但如果你想实现一个屏障,我建议你把它放在mergeThreadResults中的merge之后。这样,您就可以等待所有在该循环中执行工作的线程变为finnish,然后再转到下一个循环。

要创建它,您需要在每次迭代中通过一个新的屏障。因为在每个循环中,执行合并的线程数量都会减少。因此,开始宣布一些全球壁垒:

int mergeCycleFlag = 0;
pthread_mutex_t mutex;
pthread_barrier_t *mergeBarrier;

该标志用于为每个迭代创建一个屏障,并且我们需要为每个循环创建多个mergeBarrier。不要忘记在main函数中初始化它,使用您将要执行的迭代次数:mergeBarrier = realloc(mergeBarrier, howManyIterations);

然后我们可以创建这样一个屏障:

        pthread_mutex_lock(&mutex);
        if (mergeCycleFlag != iter) { 
            mergeCycleFlag = iter;
            int mergesInLoop = threads % iter== 0 ? threads/iter: threads/iter+1;
            pthread_barrier_init(&mergeBarrier[iter], NULL, mergesInLoop);
        }
        pthread_mutex_unlock(&mutex);
        ... MERGE ...
        // Wait everyone finnish merging
        pthread_barrier_wait (&mergeBarrier[iter]);

请注意,我使用lock来创建屏障,因为我们不希望两个线程同时在这里捣乱。如果没有为这个iter设置屏障,我们将创建一个具有现在应该工作的线程数的屏障。此外,我已经更改了您的breakpoint语句,以适应我们期望执行merge的线程数的计算。

经过一些调整后,mergeThreadResults应该是这样的:

void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
    
    int nextThread, nextThreadRight;
    int groupSize = 2;
    while (groupSize <= threads) {
        if (myRank % groupSize != 0) { // Release threads that no long perform merges
            break;
        }
        nextThread = (myRank+1*groupSize) < threads ? (myRank+1*groupSize) : threads;
        nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
 
        printf("Merging threads %ld to %dn", myRank, nextThread-1);
        // Init barrier with number of threads you will wait merging 
        pthread_mutex_lock(&mutex);  // Just one thread can set the barrier
        if (mergeCycleFlag != groupSize) { 
            mergeCycleFlag = groupSize;
            int mergesInLoop = threads % groupSize == 0 ? threads/groupSize : threads/groupSize+1; // Calculate threads working in this step
            pthread_barrier_init(&mergeBarrier[groupSize], NULL, mergesInLoop);  // set barrier
        }
        pthread_mutex_unlock(&mutex);
        // Merge thread group with neighbour group
        merge(sortingArray, myLeft, myRight, nextThreadRight);
        // Wait everyone finnish merging
        pthread_barrier_wait (&mergeBarrier[groupSize]);
        myRight = nextThreadRight;
        groupSize = groupSize * 2;
    }
    // Merge thread 0
    if (myRank == 0 && nextThread < threads-1) {
        nextThreadRight = threads * ((float)size / (float)threads) - 1;
        merge(sortingArray, myLeft, myRight, nextThreadRight);
    }
}

最后,为了让这个解决方案发挥作用,您需要每个线程在合并结果之前都完成了它们的工作。因此,您需要在main中的join之后调用它,或者在sortManager上调用mergeThreadResults之前用所有线程实现另一个屏障。

此外,更好的方法是线程只等待它们将合并的其他线程。类似地,线程0只等待1。然后是2…等等

我正在尝试并行化合并排序算法。我正在做的是为每个线程划分输入数组,然后合并线程后果

好的,但你的方法不必要地困难。在合并过程的每一步,您都希望一半的线程等待另一半完成,而一个线程等待另一个线程完成的最自然的方式是使用pthread_join()。如果你想让所有线程在同步后继续做更多的工作,那就不一样了,但在这种情况下,那些不负责任何合并的线程根本没有什么可做的了

这就是我尝试过的:

pthread_mutex_lock(&mutex);
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(&cond_var);
} else {
while (pthread_cond_wait(&cond_var, &mutex) != 0);
}
pthread_mutex_unlock(&mutex);

这有几个问题,但最大的问题是屏障是不适合工作的工具。在障碍物达到顶峰后,所有被堵塞的螺纹都会继续。你希望一半的线程继续执行合并,但其他线程(应该)没有更多的工作要做。你对breakpoint的计算假设后半部分不会返回到屏障,而他们实际上不应该这样做。如果你坚持使用屏障,那么没有合并要执行的线程应该在通过屏障后终止。

此外,在2处启动iter是不正确的。如果使用屏障方法,则所有在每次迭代中活动的线程都必须在继续之前到达屏障,但如果iter从2开始,则在第一次迭代中,只有一半的线程必须在通过屏障之前到达屏障。

此外,你的简历使用不习惯,容易出现问题。pthread_cond_wait()的任何记录的失败原因都无法通过尝试再次等待来挽救,因此您可能需要在出现错误时终止程序。还要注意,pthread_mutex_lock()pthread_mutex_unlock()pthread_cond_broadcast()也可能全部失败。

另一方面,CV容易受到(非常罕见的)虚假唤醒的影响,因此在等待成功返回后,您需要在继续之前再次检查条件,并可能再次等待。更像这样的东西:

if (pthread_mutex_lock(&mutex) != 0) {
perror("pthread_mutex_lock");
abort();
}
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
if (pthread_cond_broadcast(&cond_var) != 0) {
perror("pthread_cond_broadcast");
abort();
}
} else {
do {
if (pthread_cond_wait(&cond_var, &mutex) != 0) {
perror("pthread_cond_wait");
abort();
}
} while (counter < breakpoint);
}
if (pthread_mutex_unlock(&mutex) != 0) {
perror("pthread_mutex_unlock");
abort();
}
// some threads must terminate at this point

相关内容

  • 没有找到相关文章

最新更新