线程池执行在C中的任意点停止



我正在C中实现我自己的通用Threadpool算法,使用fibonacci序列进行测试,在过去的几天里,我一直遇到一个完全困扰我的问题。

当执行程序时,它会一直工作,直到在某个时刻它突然停止,这对我来说是显而易见的

我注意到的一件事是,执行在一小段时间后停止,因为如果添加了打印命令或睡眠命令,它会在执行的早期停止

编辑:错过了这一部分,我已经测试了死锁,但没有,它似乎只是在某个时候没有把任何新东西推到堆栈上,导致所有线程都试图从堆栈中拉出,意识到它是空的,然后跳回来无限重复这个过程。

这是代码:

线程池.h

#ifndef THREADPOOL_H_INCLUDED
#define THREADPOOL_H_INCLUDED
#include <stddef.h>
#include <stdbool.h>
typedef void (*ThreadTask_f)(void*);
typedef struct Future {
ThreadTask_f fn;   //Pointer to the to be executed function
bool fulfilled;
} Future;

extern int tpInit(size_t size);
extern void tpRelease(void);
extern void tpAsync(Future *future);
extern void tpAwait(Future *future);
/* creates an abstraction for easy interaction of functions with the threadpool
* TYPE: type that the function returns
* NAME: name of the function to be parralelised
* ARG: type of the argument of the function given
*/
#define TASK(TYPE, NAME, ARG) 
TYPE NAME(ARG); 

typedef struct { 
Future fut;  
ARG    arg;  
TYPE   res;  
} NAME ## _fut;  

static void NAME ## Thunk(void *args) { 
NAME ## _fut *data = args;          
data->res = NAME(data->arg);        
} 
static inline NAME ## _fut NAME ## Future(ARG arg) { 
return (NAME ## _fut) {                          
.fut = { .fn = &NAME ## Thunk, .fulfilled = false },             
.arg = arg                                   
};                                               
} 
static inline NAME ## _fut* NAME ## Async(NAME ## _fut *future) { 
tpAsync(&future->fut);                 
return future;                         
} 
static inline TYPE NAME ## Await(NAME ## _fut *future) { 
tpAwait(&future->fut);        
return future->res;           
}
#endif

线程池.c


#include "threadpool.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <time.h>
#define THREADSTACKSIZE 8388608
#define INITSTACKSIZE 1024  //initial value for how many Tasks can be in the taskstack
#define STACKMEMMULT 2  //if the TaskStack is full, multiply by this

typedef struct TaskStack {
Future **start;
size_t size;
long current;
} TaskStack;
typedef struct ThreadPool {
size_t size;
pthread_t *threads;
TaskStack *stack;
} ThreadPool;
static pthread_mutex_t stackAccess;
static ThreadPool *tp;
void nsleep(unsigned long nano) {
struct timespec delay = {
.tv_sec = 0,
.tv_nsec = nano
};
nanosleep(&delay, NULL);
}
static void push(Future *future){
pthread_mutex_lock(&stackAccess);
if(tp->stack->current++==tp->stack->size){
tp->stack->size*=2;
tp->stack->start=realloc(tp->stack->start, tp->stack->size);
}
tp->stack->start[tp->stack->current]=future;
pthread_mutex_unlock(&stackAccess);
}
static Future *pull(){
Future *retVal=NULL;
PULLBEGIN:
pthread_mutex_lock(&stackAccess);
if(tp->stack->current==-1){ //if there is nothing on the stack test if there is a cancel attempt and yield the scheduler to a thread that might add tasks.
pthread_mutex_unlock(&stackAccess);
pthread_testcancel();
sched_yield();
goto PULLBEGIN;
}
retVal=tp->stack->start[tp->stack->current];
tp->stack->current--;
pthread_mutex_unlock(&stackAccess);
return retVal;
}
static void *workerThread(void *args){
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
Future *fut;
while(true){
fut=pull();
fut->fn(fut);
fut->fulfilled=true;
pthread_testcancel();
}
return NULL;
}
int tpInit(size_t size) {

int err;
tp=NULL;
accessStack=0;
pushExisting=0;
pthread_mutex_init(&stackAccess, NULL);
tp=malloc(sizeof(ThreadPool));
if(tp==NULL){
err=0;
goto ERRHANDLINIT;
}
tp->size=0;
tp->stack=malloc(sizeof(TaskStack));
if(tp->stack==NULL){
err=1;
goto ERRHANDLINIT;
}
tp->threads=malloc(sizeof(pthread_t)*size);
if(tp->threads==NULL){
err=2;
goto ERRHANDLINIT;
}
tp->stack->start=malloc(sizeof(Future *)*INITSTACKSIZE);
if(tp->stack->start==NULL){
err=3;
goto ERRHANDLINIT;
}
tp->stack->current=-1;
tp->stack->size=INITSTACKSIZE; 
pthread_attr_t attributes;
if(pthread_attr_init(&attributes)!=0){
err=4;
goto ERRHANDLINIT;
}
if(pthread_attr_setstacksize(&attributes, THREADSTACKSIZE)!=0){
err=5;
goto ERRHANDLINIT;
}
if(pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE)!=0){
err=6;
goto ERRHANDLINIT;
}
for(int i=0; i<size;i++){
if(pthread_create(&(tp->threads[i]), &attributes, workerThread,NULL)!=0){
err=20+i;
goto ERRHANDLINIT;
}
}
return 0;
ERRHANDLINIT:
perror("Problem while initiating the threadpool with the following errcode: ");
fprintf(stderr,"%in", err);
return -1;
}
void tpRelease(void) {
for(int i=0; i<tp->size; i++){
pthread_cancel(tp->threads[i]);
pthread_join(tp->threads[i], NULL);
}
free(tp->stack->start);
free(tp->stack);
free(tp->threads);
free(tp);
}
void tpAsync(Future *future) {
future->fulfilled=false;
push(future);
return;
}
void tpAwait(Future *future) {
while(!future->fulfilled){
Future *workFut=pull();
workFut->fn(workFut);
workFut->fulfilled=true;
}
}

main.c

#include "threadpool.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

static TASK(long, fib, long);
long fib(long n) {
if (n <= 1){
return n;
}
fib_fut *a = fibAsync((fib_fut[]) { fibFuture(n - 1) });
fib_fut *b = fibAsync((fib_fut[]) { fibFuture(n - 2) });
return fibAwait(a) + fibAwait(b);
}
int main() {
if (tpInit(8) != 0)
perror("Thread Pool initialization failed"), exit(-1);
atexit(&tpRelease);

for (long i = 0; i <= 100; ++i)
printf("fib(%2li) = %lin", i, fib(i));

return 0;
}

Makefile

#!/usr/bin/make
.SUFFIXES:
.PHONY: all run pack clean
SRC = $(wildcard *.c)
OBJ = $(SRC:%.c=%.o)
TAR = threadpool
CFLAGS = -std=gnu11 -c -g -Os -Wall -MMD -MP
LFLAGS = -pthread
DEP = $(OBJ:%.o=%.d)
-include $(DEP)
%.o: %.c
$(CC) $(CFLAGS) $< -o $@
$(TAR): $(filter-out quicksort.o,$(OBJ))
$(CC) $(LFLAGS) -o $@ $^
all: $(TAR)
run: all
./$(TAR)
clean:
$(RM) $(RMFILES) $(OBJ) $(TAR) bench $(DEP) $(PCK)

我真的希望你知道发生了什么。提前谢谢。

所以我在Craig Estey和Amit的亲切帮助下找到了答案(你可以在原帖子的评论中看到(。

所以最终这是一场僵局,因为正如你仍然可以在我不会修改的原始帖子中看到的那样,任何感兴趣的人都有机会关注我的愚蠢行为。

发生这种情况的原因是,在某一点上,将有6个线程等待pull,堆栈为空,剩下的两个线程是一个进入等待状态,另一个刚刚完成给定的函数,该函数没有递归调用另一个(在我们的示例中,使用fib(0(或fib(1(的线程(。现在线程已经完成了,让我们称之为线程7,进入fib_await((将检查它正在等待的值是否满足,而此时它还没有满足,因此它会检查堆栈中是否还有其他值。由于没有,它被困在等待中。

现在,另一个线程,线程8,刚刚完成其给定函数的线程,将其未来标记为已完成,并试图获取另一个未来。由于它是空的,它也将保持拉力。

现在,所有线程都被困在pull中,没有一个线程可以继续,因为等待另一个线程的线程必须首先离开pull((。

我唯一的修改是pull((、push((、tpAwait((、tpInit((和workerThread((,因为我还实现了一个非常简单的票证锁定。

𝛿线程池.c


static void ticketLockInit(){
atomic_init(&nowServing, 0);
atomic_init(&nextTicket, 0);
}
static inline void ticketLockAcquire(){
atomic_long myTicket=atomic_fetch_add(&nextTicket,1);
while(myTicket!=nowServing){
nsleep(1);
}
}
static inline void ticketLockRelease(){
++nowServing;
}
static void push(Future *future){

ticketLockAcquire();
if(++tp->stack->current==tp->stack->size){
fprintf(stderr, "MemReallocn");
tp->stack->size=tp->stack->size*2;
tp->stack->start=realloc(tp->stack->start, tp->stack->size);
}
tp->stack->start[tp->stack->current]=future;
ticketLockRelease();
}
static Future *pull(){
Future *retVal=NULL;
ticketLockAcquire();
if(tp->stack->current>-1){  //if there is nothing on the stack test if there is a cancel attempt and yield the scheduler to a thread that might add tasks.
retVal=tp->stack->start[tp->stack->current];
tp->stack->current--;
}
ticketLockRelease();
return retVal;
}
static void *workerThread(void *args){
pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
Future *fut;
while(true){
if((fut=pull())!=NULL){
fut->fn(fut);
fut->fulfilled=true;
pthread_testcancel();
}
}
return NULL;
}
void tpAwait(Future *future) {
while(!future->fulfilled){

Future *workFut;
if((workFut=pull())!=NULL){
workFut->fn(workFut);
workFut->fulfilled=true;
pthread_testcancel();
}


}
}
int tpInit(size_t size) {

int err;
tp=NULL;
accessStack=0;
pushExisting=0;
pthread_mutex_init(&stackAccess, NULL);
ticketLockInit();
tp=malloc(sizeof(ThreadPool));
if(tp==NULL){
err=0;
goto ERRHANDLINIT;
}
tp->size=0;
tp->stack=malloc(sizeof(TaskStack));
if(tp->stack==NULL){
err=1;
goto ERRHANDLINIT;
}
tp->threads=malloc(sizeof(pthread_t)*size);
if(tp->threads==NULL){
err=2;
goto ERRHANDLINIT;
}
tp->stack->start=malloc(sizeof(Future *)*INITSTACKSIZE);
if(tp->stack->start==NULL){
err=3;
goto ERRHANDLINIT;
}
tp->stack->current=-1;
tp->stack->size=INITSTACKSIZE; 
pthread_attr_t attributes;
if(pthread_attr_init(&attributes)!=0){
err=4;
goto ERRHANDLINIT;
}
if(pthread_attr_setstacksize(&attributes, THREADSTACKSIZE)!=0){
err=5;
goto ERRHANDLINIT;
}
if(pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE)!=0){
err=6;
goto ERRHANDLINIT;
}
for(int i=0; i<size;i++){
if(pthread_create(&(tp->threads[i]), &attributes, workerThread,NULL)!=0){
err=20+i;
goto ERRHANDLINIT;
}
}
return 0;
ERRHANDLINIT:
perror("Problem while initiating the threadpool with the following errcode: ");
fprintf(stderr,"%in", err);
return -1;
}

最新更新