我正在编写一个程序,以获取一些输入;目录,文件名和一些标志。该程序的目的是搜索给定文件的给定目录。并且在搜索时,如果找到另一个目录,它将打开该目录并继续在此处进行搜索。其中一个标志允许用户选择程序将使用多少个线程来搜索文件。
目录存储在堆栈中,我遇到的问题是线程之间的同步。我目前正在使用静音的等待状态。这意味着如果线程已经等待一定的时间,并且存储目录为空的堆栈将结束。问题是,当仅运行2个线程时,1个线程可能最终完成所有工作,即打开400个目录,另一个目录打开0。
所以我的问题是...如何以更好的方式同步我的线程?也许不使用定时等待条件?线程何时应终止?
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <dirent.h>
#include <getopt.h>
#include <string.h>
#include <limits.h>
#include <errno.h>
#include <pthread.h>
void search_func(char *path, char *name, int d, int f, int l);
void *thread_func(void *arg);
void push(char *data);
char* pop();
#define MAXLENGTH 1000
#define MAXSIZE 10000
#define WAIT_TIME_SECONDS 0.1
pthread_mutex_t lock;
pthread_cond_t count_threshold_cv;
struct stack
{
char stk[MAXSIZE][MAXLENGTH];
int top;
};
typedef struct stack STACK;
STACK s;
struct arg_keeper {
char **argv;
int argc;
int d;
int f;
int l;
};
int main(int argc, char **argv) {
if(argc < 3) {
fprintf(stderr, "Not enough argumentsn");
return 1;
}
char *xValue = NULL;
int x;
int d = 0;
int f = 0;
int l = 0;
int nrthr = 0;
opterr = 0;
int thread_count = 0;
int directory_exist = 0;
pthread_t tid[1024];
while ((x = getopt(argc, argv, "t:p:")) != -1) {
switch (x) {
case 't':
xValue = optarg;
if (*xValue == 'd') {
d = 1;
} else if (*xValue == 'f') {
f = 1;
} else if (*xValue == 'l') {
l = 1;
}
break;
case 'p':
nrthr = atoi(optarg);
if(nrthr == 0) {
fprintf(stderr, "Invalid thread countn");
return 1;
}
break;
case '?':
if (isprint (optopt))
fprintf(stderr, "Unknown option '-%c'.n",
optopt);
return 1;
default:
abort();
}
}
if (argc >= 3) {
int i;
for (i = optind; i < argc - 1; i++) {
directory_exist = 1;
push(argv[i]);
}
}
if(directory_exist == 0) {
fprintf(stderr, "No directories enteredn");
return 1;
}
struct arg_keeper * arg_struct = malloc(sizeof(*arg_struct));
arg_struct->argv = argv;
arg_struct->argc = argc;
arg_struct->d = d;
arg_struct->f = f;
arg_struct->l = l;
if(pthread_mutex_init(&lock, NULL) != 0) {
fprintf(stderr, "Mutex initialisation failedn");
return 1;
}
if(pthread_cond_init(&count_threshold_cv, NULL) != 0) {
fprintf(stderr, "Condition variable initialisation failedn");
return 1;
}
while(thread_count < nrthr - 1) {
if(pthread_create(&(tid[thread_count++]), NULL, thread_func,
arg_struct) != 0)
fprintf(stderr, "Can't create threadn");
}
if(nrthr!=0)
thread_func(arg_struct);
else
thread_func(arg_struct);
int c;
for(c = 0; c < nrthr; c++) {
pthread_join(tid[c], NULL);
}
pthread_mutex_destroy(&lock);
free(arg_struct);
return 0;
}
void *thread_func(void *arg) {
int dirOpened = 0;
struct arg_keeper arg_struct = *(struct arg_keeper *)arg;
char *data;
pthread_mutex_lock(&lock);
struct timespec ts;
struct timeval tp;
while(1) {
gettimeofday(&tp, NULL);
ts.tv_sec = tp.tv_sec;
ts.tv_nsec = tp.tv_usec * 1000;
ts.tv_sec += WAIT_TIME_SECONDS;
if (pthread_cond_timedwait(&count_threshold_cv, &lock, &ts) == ETIMEDOUT) {
if (s.top) {
data = pop();
pthread_cond_signal(&count_threshold_cv);
dirOpened++;
search_func(data, arg_struct.argv[arg_struct.argc - 1], arg_struct.d,
arg_struct.f, arg_struct.l);
}
else
break;
}
}
pthread_mutex_unlock(&lock);
fprintf(stdout, "Thread with id %lu opened %d directoriesn",
pthread_self(), dirOpened);
return NULL;
}
void search_func(char *inPath, char *testName, int d, int f, int l) {
char path[PATH_MAX];
strcpy(path, inPath);
struct dirent *pDirent;
DIR *pDir;
struct stat file_info;
if ((pDir = opendir(path)) == NULL) {
fprintf(stderr, "Error:'%s': %sn", path, strerror(errno));
} else {
int v1;
int v2;
char *str1 = ".";
char *str2 = "..";
char name[PATH_MAX];
strcpy(name, testName);
char testPath[PATH_MAX];
strcpy(testPath, path);
char testPathLast[PATH_MAX];
strcpy(testPathLast, path);
while ((pDirent = readdir(pDir)) != NULL) {
if (strcmp(pDirent->d_name, name) == 0 && d == 0 &&
f == 0 && l == 0) {
if (path[strlen(path) - 1] != '/')
strcat(testPathLast, "/");
strcat(testPathLast, pDirent->d_name);
fprintf(stdout, "%sn", testPathLast);
}
char testPath2[PATH_MAX];
strcpy(testPath2, testPath);
strcat(testPath2, "/");
strcat(testPath2, pDirent->d_name);
if (lstat(testPath2, &file_info) != 0)
fprintf(stderr, "lstat error2: %sn",
strerror(errno));
if (d == 1) {
if (strcmp(pDirent->d_name, name)
== 0 && S_ISDIR(file_info.st_mode)) {
if (path[strlen(path) - 1] != '/')
strcat(testPathLast, "/");
strcat(testPathLast, pDirent->d_name);
fprintf(stdout, "%sn", testPathLast);
}
}
if (f == 1) {
if (strcmp(pDirent->d_name, name)
== 0 && S_ISREG(file_info.st_mode)) {
if (path[strlen(path) - 1] != '/')
strcat(testPathLast, "/");
strcat(testPathLast, pDirent->d_name);
fprintf(stdout, "%sn", testPathLast);
}
}
if (l == 1) {
if (strcmp(pDirent->d_name, name)
== 0 && S_ISLNK(file_info.st_mode)) {
if (path[strlen(path) - 1] != '/')
strcat(testPathLast, "/");
strcat(testPathLast, pDirent->d_name);
fprintf(stdout, "%sn", testPathLast);
}
}
v1 = strcmp(pDirent->d_name, str1);
v2 = strcmp(pDirent->d_name, str2);
if ((v1 != 0 && v2 != 0) && S_ISDIR(file_info.st_mode)) {
strcpy(path, testPath);
strcpy(path, testPath);
if (path[strlen(path) - 1] != '/')
strcat(path, "/");
strcat(path, pDirent->d_name);
push(path);
}
}
closedir(pDir);
}
}
void push(char *data)
{
if(s.top == (MAXSIZE - 1)) {
fprintf(stderr, "Stack is fulln");
return;
}
else {
s.top = s.top + 1;
strcpy(&(s.stk[s.top][0]), data);
}
return;
}
char* pop()
{
char *data;
if(s.top == -1) {
fprintf(stderr, "Stack is emptyn");
return NULL;
}
else {
data = s.stk[s.top];
s.top = s.top - 1;
}
return data;
}
使用OP的实现更好地使用POSIX nftw()
或BSD fts
(两者都可以在Linux的Glibc中提供),此实现中的基本问题实际上很有趣:每个问题都非常有趣:每个问题工作线程最初消耗一个基准,然后处理一段时间,并可能产生其他基准。
问题案例是消耗所有现有基准的情况,但是有一个或多个工作线程,可能会产生其他基准。因此,没有更多的基准要处理,这是工人线程退出的原因。工作线程只能在没有更多的数据进行处理时退出,,并且没有可以生成运行的其他数据。
的工作线程。显而易见的解决方案是使用MUTEX,条件变量(用于等待新数据)和当前正在运行的工作线程数量的计数器。
假设基准存储在单连接的列表中:
struct work_item {
struct work_item *next;
char path[];
};
上面的path
成员是C99灵活数组成员。我们可以用来描述要完成的工作的结构可以是
struct work {
pthread_mutex_t mutex;
pthread_cond_t cond;
long active;
struct work_item *item;
};
#define WORK_INITIALIZER {
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_COND_INITIALIZER,
0L, NULL }
将初始项目推向item
列表后,将创建一个或多个线程,给定指向共享struct work
结构的指针。
逻辑类似于以下内容:
void *worker_thread(void *work_ptr)
{
struct work *const work = (struct work *)word_ptr;
struct work_item *item;
pthread_mutex_lock(&(work->mutex));
while (1) {
/* If there are no active workers,
nor any work items, we're done. */
if (!work->item && !work->active) {
/* Ensure threads waiting on the condition
variable are woken up, so they quit too. */
pthread_cond_broadcast(&(work->cond));
pthread_mutex_unlock(&(work->mutex));
return NULL;
}
/* No work items left? */
if (!work->item) {
/* Wait for a new one to be produced,
or a worker to notice we're done. */
pthread_cond_wait(&(work->cond), &(work->mutex));
continue;
}
/* Increment active worker count, grab an item,
and work on it. */
work->active++;
item = work->item;
work->item = work->item->next;
item->next = NULL;
/* Unlock mutex while working. */
pthread_mutex_unlock(&(work->mutex));
/*
* TODO: Work on item
*/
pthread_mutex_lock(&(work->mutex));
work->active--;
}
}
当然,当从项目上工作时,将项目推到堆栈时必须重新读取互斥品,并在条件变量上发出信号以唤醒工作人员线程(如果正在等待新工作):
struct work_item *temp;
/* TODO: Allocate and initialize temp */
pthread_mutex_lock(&(work->mutex));
temp->next = work->item;
work->item = temp;
pthread_cond_signal(&(work->cond));
pthread_mutex_unlock(&(work->mutex));
注意活动计数器如何反映当前在项目上工作的线程数(并且基本上是当前正在运行的生产者的数量)。它是不是现有工作线程的数量!
如果线程注意到没有更多的项目可以使用,也没有任何生产者运行,则该条件变量是广播工作)。每当将项目添加到工作列表中时,条件变量也会发出信号(仅唤醒一个等待线程)。
如何以更好的方式同步线程?也许不使用定时等待条件?
是的 - 我会丢弃条件变量,并使用一组两个信号量,第一个信号量计数堆栈上的待处理目录以及第二个信号量计数繁忙的工作线程。为了使工作线程保持简单,我将在创建的线程中进行所有搜索工作,而不是从main()
调用thread_func()
。可以保留pthread_mutex_t lock
以保护STACK s
免受并发访问。
线程应何时终止?
正如名义动物写的那样:一个工作线程只能在没有更多基准处理的情况下退出,并且没有可以生成其他基准运行的工作线程。上述信号量并允许在main()
线程中轻松等待该条件。
您程序的更改将为
-
在文件范围中:
#include <sys/sem.h> int semid;
-
在
push()
中:void push(char *data) { pthread_mutex_lock(&lock); if (s.top == MAXSIZE-1) fprintf(stderr, "Stack is fulln"); else strcpy(s.stk[++s.top], data), semop(semid, &(struct sembuf){0, 1}, 1); // add 1 to "dirs on stack" pthread_mutex_unlock(&lock); return; }
请注意,我们在这里计算第一个信号。
-
在
pop()
中:char *pop() { char *data; pthread_mutex_lock(&lock); if (s.top == -1) fprintf(stderr, "Stack is emptyn"), data = NULL; else data = strdup(s.stk[s.top--]); // Don't return freed stack slot! pthread_mutex_unlock(&lock); return data; }
请注意,我们返回数据的副本,而不仅仅是指向数据的指针,因为可以随时被另一个线程重复使用和覆盖。
-
在
main()
中,在将目录参数推到堆栈上之前:// create semaphore set of 2 sems: [0] dirs on stack, [1] threads at work semid = semget(IPC_PRIVATE, 2, S_IRWXU); semctl(semid, 0, SETALL, (unsigned short [2]){}); // zero the sem values
请注意,必须将其放置在第一次致电
push()
之前,以便可以计算信号量。 -
在
main()
中创建线程并称为thread_func()
:while (thread_count < nrthr) if (pthread_create(&tid[thread_count++], NULL, thread_func, arg_struct)) fprintf(stderr, "Can't create threadn"); // wait until no more dirs on stack and no more threads at work semop(semid, (struct sembuf []){{0, 0}, {1, 0}}, 2); semctl(semid, 0, IPC_RMID); // remove the semaphores, make threads finish
请注意,我们创建
nrthr
而不是nrthr - 1
线程,因为main()
线程不参与工作,它只需等待所有正在完成的工作即可。然后,它破坏了信号量的集合,从而导致工作线程摆脱循环并完成(请参见下文)。 -
在
thread_func()
中:void *thread_func(void *arg) { int dirOpened = 0; struct arg_keeper arg_struct = *(struct arg_keeper *)arg; char *data; // wait for work, subtract 1 from dirs on stack and add 1 to threads at work while (semop(semid, (struct sembuf []){{0, -1}, {1, 1}}, 2) == 0) { // this loop ends when semid is removed data = pop(); dirOpened++; search_func(data, arg_struct.argv[arg_struct.argc-1], arg_struct.d, arg_struct.f, arg_struct.l); free(data); semop(semid, &(struct sembuf){1, -1}, 1); // "threads at work" -= 1 } fprintf(stdout, "Thread with id %lu opened %d directoriesn", pthread_self(), dirOpened); return (void *)dirOpened; }
请注意,
semop()
返回 -1 当main()
破坏了信号量集时,循环结束了。另请注意,我们释放了已在pop()
中分配的数据副本。