对于一项任务,我用C语言完成了一个快速排序程序的函数顺序版本,但现在我需要通过使用线程的并行化来加快它的速度。下面的修改程序正在排序并使用线程(使用>100%CPU(,但现在按顺序执行时需要比以前更长的时间。
该程序的工作方式是,当第8个线程终止时,该程序现在应该继续按顺序执行程序的其余部分。
我知道这可能不是用线程优化时的可选方式,但这就是这个程序应该如何工作的。我想这和pthread_join
有关,但我试着移动它,但这没有什么区别,有时甚至更糟。
*请注意,我在这里确实包含了数组的初始化,因为它并不相关。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define THREADS 8
#define KILO (1024)
#define MEGA (1024*1024) //1 048 576
#define MAX_ITEMS (64 *MEGA) //67 108 864
#define swap(v, a, b) {unsigned tmp; tmp=v[a]; v[a]=v[b]; v[b]=tmp;}
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static int *v;
int global_thread = 1; //Starts at 1, because the thread from main is the first
struct threadArgs
{
int low;
int high;
};
static int partition(int *v, int low, int high, int pivot_index)
{
if (pivot_index != low)
swap(v, low, pivot_index);
pivot_index = low;
low++;
while (low <= high)
{
if (v[low] <= v[pivot_index])
low++;
else if (v[high] > v[pivot_index])
high--;
else
swap(v, low, high);
}
if (high != pivot_index)
swap(v, pivot_index, high);
return high;
}
void* quick_sort(void* arguments)
{
struct threadArgs* parent = (struct threadArgs*) arguments;
pthread_t child;
struct threadArgs* child_thread;
child_thread = malloc(sizeof(struct threadArgs));
int pivot_index;
int low = parent->low;
int high = parent->high;
if (low >= high)
return NULL;
pivot_index = (low + high) / 2;
pivot_index = partition(v, low, high, pivot_index);
if(global_thread < THREADS) //should only create 7 new threads, 1 is already from main
{
if(pivot_index < high) //child: right side
{
child_thread->low = pivot_index +1;
child_thread->high = high;
pthread_mutex_lock(&lock);
global_thread++;
pthread_mutex_unlock(&lock);
pthread_create(&child, NULL, quick_sort, (void*) child_thread);
}
if(low < pivot_index) //parent: left side,
{
parent->low = low;
parent->high = pivot_index - 1;
quick_sort((void*) parent);
}
pthread_join(child, NULL);
}
else
{
if (low < pivot_index) //left side sequential
{
parent->low = low;
parent->high = pivot_index - 1;
quick_sort((void*) parent);
}
if (pivot_index < high) //right side sequential
{
parent->low = pivot_index +1;
parent->high = high;
quick_sort((void*) parent);
}
}
return NULL;
}
int main(int argc, char **argv)
{
init_array();
struct threadArgs* args; // main thread
args = malloc(sizeof(struct threadArgs));
args->low = 0;
args->high = MAX_ITEMS - 1;
quick_sort((void *)args);
pthread_mutex_destroy(&lock);
free(args);
return 0;
}
您的代码存在严重的内存泄漏:每次调用quick_sort()
都会为子线程的参数分配内存,即使它最终没有启动新的子线程,而且这些内存都不会被释放。malloc()
相对昂贵,并且根据您的实现,它可能会获取锁,从而造成瓶颈。
您的代码有一个数据竞赛:quick_sort()
在互斥对象或其他同步对象的保护之外读取共享变量global_threads
,该变量也会被您的一些线程修改。因此,所产生的行为是未定义的。
您的代码有一个缺陷,即quick_sort()
函数的global_thread < THREADS
分支只有条件地启动一个新线程,但无条件地尝试加入一个线程。如果没有启动子线程,那么由于在变量child
不确定时读取其值,连接尝试将产生未定义的行为。然而,在使用随机数据的实践中,这种情况不太可能发生。
建议:
-
移动
malloc()
调用,以便仅在实际要创建新线程的情况下进行分配。由于每次分配的数量很小,并且分配的数量也很小,因此您可能可以继续允许内存泄漏,但最好在加入子进程后释放内存。 -
更改决定是否启动子线程的策略。例如,跟踪递归深度,并仅在较浅的深度(两个或三个级别(启动新的子级。这样就不需要共享变量或互斥体,只需要
struct threadArgs
中的一个附加成员。 -
当您还没有启动子线程时,请确保不要尝试加入该子线程。
实现只是一个练习,但我自己对这些修复的实现带来了显著的单线程性能改进,并通过添加更多线程显示出合理(但不太好(的加速:两个线程的加速率约为30%,四个线程的增快率略高于50%。