c-多线程Fibonacci对程序



我正在尝试编写一个程序,该程序创建两个线程:"前端"one_answers"后端"线程。我想创建一个"后端"线程来迭代和计算fibonacci序列中的项对,并将它们放在一个数组中,以及一个"前端"线程,它将在每次迭代时打印出数组对。

"前端"线程-用于显示每次迭代中"后端"线程操作的结果

"后端"线程-用于计算和设置数组

即。[5,8],迭代后将包含[13,21]

我正在努力在线程中实现Fibonacci序列部分,我已经取得了以下进展:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <errno.h>
int fib;
void *front_end(void *ptr);
void *back_end(void *ptr);
int main() {
pthread_t thread1, thread2;
int arr[2] = {5,8};
const int *ptrtoarr;
ptrtoarr=arr;
int create1, create2;
int *s=(int *)(ptrtoarr);
printf("%d n", *s);
ptrtoarr++;
s = (int *)(ptrtoarr);
printf("%d n", *s);
ptrtoarr--;
create1 = pthread_create(&thread1, NULL, back_end, &arr);
if(create1) {
fprintf(stderr,"Error - pthread_create() return code: %dn",create1);
exit(EXIT_FAILURE);
}
pthread_join(thread1, NULL);
//pthread_join(thread2, NULL);
}
// front-end thread to be callback for each back-end iteration
void *front_end(void *ptr) {
int *sum = ptr;
int i, upper = atoi(ptr);
if (upper > 0) {
for (i=0; i<upper; i++){
//Print the fib pairs
}
}
pthread_exit(0);
}
void *back_end(void *ptr) {
int i, upper = atoi(ptr);
fib=1;
if(upper > 0) {
int pre1 = 0;
int current;
//calc fib numbers.....
if(fib == 1){
printf("")
}
}
}

有人能指导我如何处理这个问题吗?

您的骨架需要工作。

假设如下:

unsigned n = ...;      // How many to generate.
unsigned n_ready = 2;  // How many are ready to print.
unsigned *fibs = malloc(sizeof(unsigned)*n);
fibs[0] = 0;
fibs[1] = 1;

在你的后端工作者的核心,你将有

for (unsigned i=2; i<n; ++i) {
fibs[i] = fibs[i-2] + fibs[i-1];
n_ready = i+1;
}

作为前端员工的核心,您将拥有

for (unsigned i=0; i<n; ++i) {
while (i >= n_ready)
/* Nothing */;
printf("%un", fibs[i]);
}

问题#1

如果一个线程在另一个线程写入变量时试图读取该变量,就会遇到问题。两个或多个线程同时读取同一变量是可以的。

两个线程使用的变量都是n,即fib[]n_ready的元素。

  • n:
    两个线程都没有更改,所以我们不需要控制对它的访问。

  • fib[i]对于i >= n_ready
    仅由后端工作程序访问,因此我们不需要控制对这些的访问。

  • fib[i]fori < n_ready:
    仅由前端工作人员访问,因此我们不需要控制对这些的访问。

  • n_ready:
    后端工作人员可以随时设置n_ready,前端工作人员可以尝试随时读取n_ready,因此我们确实需要控制对n_ready的访问。

Mutex通常用于确保一次只有一个线程访问资源(例如一个变量、一组变量、文件句柄等(。

我们的后端员工成为

for (unsigned i=2; i<n; ++i) {
// The mutex only protects n_ready
// --nothing else is going to touch fib[i-2] or fib[i-1] or fib[i]--
// so we don't need to obtain a lock yet.
fibs[i] = fibs[i-2] + fibs[i-1];
// We need to access n_ready.
pthread_mutex_lock(&mutex);
n_ready = i+1;
pthread_mutex_unlock(&mutex);
}

我们的前端员工成为

for (unsigned i=0; i<n; ++i) {
// We need to access n_ready.
pthread_mutex_lock(&mutex);
while (i >= n_ready) {
// Allow other thread to gain the lock.
pthread_mutex_unlock(&mutex);
// We need to access n_ready.
pthread_mutex_lock(&mutex);
}
// The mutex only protects n_ready
// --nothing is going to change fib[i]--
// so we can release it now rather than later.
pthread_mutex_unlock(&mutex);
printf("%un", fibs[i]);
}

问题#2

你有一个繁忙的循环。一般来说,这是不好的,因为这意味着您的线程正在使用100%的等待。(在这种特殊情况下,由于i >= n_ready可能已经是真的,所以这实际上是一个很好的策略。但让我们忽略这一点。(一个线程可以休眠,直到另一个线程使用条件变量发出信号。

我们的后端员工成为

for (unsigned i=2; i<n; ++i) {
// The mutex only protects n_ready
// --nothing else is going to touch fib[i-2] or fib[i-1] or fib[i]--
// so we don't need to obtain a lock yet.
fibs[i] = fibs[i-2] + fibs[i-1];
// We need to access n_ready.
pthread_mutex_lock(&mutex);
n_ready = i+1;
// Wake up the other thread if it's blocked.
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}

我们的前端员工成为

for (unsigned i=0; i<n; ++i) {
// We need to access n_ready.
pthread_mutex_lock(&mutex);
while (i >= n_ready)
pthread_cond_wait(&cond, &mutex);
// The mutex only protects n_ready
// --nothing is going to change fib[i]--
// so we can release it now rather than later.
pthread_mutex_unlock(&mutex);
printf("%un", fibs[i]);
}

始终在锁定的互斥对象上调用pthread_cond_wait。它将在被调用时解锁互斥锁,并在返回之前锁定它。这允许另一个线程获得互斥,以便更改n_ready


完整代码:

#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#define UNUSED(x) (void)(x)

// To control access to n_ready.
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static unsigned n_ready = 0;  // How many are ready to print.
static unsigned n;  // How many to generate.
static unsigned *fibs = NULL;

static void *back_worker(void *unused) {
UNUSED(unused);
fibs[0] = 0;
fibs[1] = 1;
// We need to access n_ready.
pthread_mutex_lock(&mutex);
n_ready = 2;
// Wake up the other thread if it's blocked.
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
for (unsigned i=2; i<n; ++i) {
// The mutex only protects n_ready
// --nothing is going to touch fib[i]--
// so we don't need to obtain a lock yet.
fibs[i] = fibs[i-2] + fibs[i-1];
// We need to access n_ready.
pthread_mutex_lock(&mutex);
n_ready = i+1;
// Wake up the other thread if it's blocked.
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
return NULL;
}

static void *front_worker(void *unused) {
UNUSED(unused);
for (unsigned i=0; i<n; ++i) {
// We need to access n_ready.
pthread_mutex_lock(&mutex);
while (i >= n_ready)
pthread_cond_wait(&cond, &mutex);
// The mutex only protects n_ready
// --nothing is going to change fib[i]--
// so we can release it now rather than later.
pthread_mutex_unlock(&mutex);
printf("%un", fibs[i]);
}
return NULL;
}

int main(void) {
n = 20;  // How many to generate.
fibs = malloc(sizeof(unsigned) * n);
pthread_t back_thread;
if (errno = pthread_create(&back_thread, NULL, back_worker, NULL)) {
perror(NULL);
exit(1);
}
pthread_t front_thread;
if (errno = pthread_create(&front_thread, NULL, front_worker, NULL)) {
perror(NULL);
exit(1);
}
pthread_join(back_thread, NULL);
pthread_join(front_thread, NULL);
pthread_cond_destroy(&cond);
pthread_mutex_destroy(&mutex);
free(fibs);
return 0;
}

输出:

$ gcc -Wall -Wextra -pedantic a.c -o a -lpthread && a
0
1
1
2
3
5
8
13
21
34
55
89
144
233
377
610
987
1597
2584
4181

应用上述的练习建议

创建一个工作人员池,打印出放入队列中的数字。输出不需要有序。

辅助函数已经为您编写您不能更改mainworker功能我甚至为您创建了队列。您只需通过修改Queue_enqueueQueue_dequeueQueue_done函数使其线程安全即可这些是您可以更改的唯一功能

#include <errno.h>
#include <inttypes.h>
#include <pthread.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#define NUM_WORKERS 4
#define QUEUE_SIZE 10
#define NUM_ITEMS 40
typedef struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
int done;
int empty;
int full;
size_t max;
size_t next_insert;
size_t next_read;
unsigned *buf;
} Queue;
static void Queue_init(Queue* q, size_t max) {
pthread_mutex_init(&(q->mutex), NULL);
pthread_cond_init(&(q->cond), NULL);
q->done = 0;
q->empty = 1;
q->full = 0;
q->max = max;
q->next_insert = 0;
q->next_read = 0;
q->buf = malloc(sizeof(unsigned)*max);
}
static void Queue_destroy(Queue *q) {
free(q->buf);
pthread_cond_destroy(&(q->cond));
pthread_mutex_destroy(&(q->mutex));
}
static void Queue_done(Queue *q) {
q->done = 1;
}
// Returns the oldest item from the queue (via a parameter) and returns 1.
// If the queue is empty and done, returns 0.
// If the queue is empty and not done, waits until that changes.
static int Queue_dequeue(Queue *q, unsigned *i) {
while (q->empty && !q->done) {
}
if (q->empty) {
// We are completely done.
return 0;
} else {
*i = q->buf[ q->next_read ];
q->next_read = ( q->next_read + 1 ) % q->max;
q->empty = q->next_read == q->next_insert;
q->full = 0;
return 1;
}
}
// Adds the argument to the queue.
// If the queue is full, waits until that changes.
static void Queue_enqueue(Queue *q, unsigned i) {
while (q->full && !q->done) {
}
if (q->done) {
fprintf(stderr, "Error: Attempted to add item to "done" queue.n");
return;
}
q->buf[q->next_insert] = i;
q->next_insert = ( q->next_insert + 1 ) % q->max;
q->empty = 0;
q->full = q->next_insert == q->next_read;
}
static int msleep(long msec) {
struct timespec ts;
int res;
if (msec < 0) {
errno = EINVAL;
return -1;
}
ts.tv_sec = msec / 1000;
ts.tv_nsec = (msec % 1000) * 1000000;
do {
res = nanosleep(&ts, &ts);
} while (res && errno == EINTR);
return res;
}
// Protects access to stdout.
static pthread_mutex_t stdout_mutex;
static Queue q;
static void *worker(void *worker_id_) {
uintptr_t worker_id = (uintptr_t)worker_id_;
unsigned int seed = worker_id;  // Whatever.
unsigned i;
while (Queue_dequeue(&q, &i)) {
pthread_mutex_lock(&stdout_mutex);
printf("[%" PRIuPTR "] Dequeued %un", worker_id, i);
pthread_mutex_unlock(&stdout_mutex);
// msleep( rand_r(&seed) % 1000 + 1000 );  // Simulate a 1 to 2s load.
pthread_mutex_lock(&stdout_mutex);
printf("[%" PRIuPTR "]    Finished processing %un", worker_id, i);
pthread_mutex_unlock(&stdout_mutex);
}
return NULL;
}
int main(void) {
Queue_init(&q, QUEUE_SIZE);
pthread_t workers[NUM_WORKERS];
for (uintptr_t i=0; i<NUM_WORKERS; ++i) {
if (errno = pthread_create(&(workers[i]), NULL, worker, (void*)i)) {
perror(NULL);
exit(1);
}
}
for (unsigned i=0; i<NUM_ITEMS; ++i) {
pthread_mutex_lock(&stdout_mutex);
printf("[x] Enqueuing %u...n", i);
pthread_mutex_unlock(&stdout_mutex);
Queue_enqueue(&q, i);
pthread_mutex_lock(&stdout_mutex);
printf("[x]    Enqueued %u.n", i);
pthread_mutex_unlock(&stdout_mutex);
}
Queue_done(&q);
pthread_mutex_lock(&stdout_mutex);
printf("[x] Called done.n");
pthread_mutex_unlock(&stdout_mutex);
for (unsigned i=0; i<NUM_WORKERS; ++i)
pthread_join(workers[i], NULL);
Queue_destroy(&q);
pthread_mutex_destroy(&stdout_mutex);
return 0;
}

如果您对此有疑问,请随时发布问题链接作为对此答案的评论。


建议锻炼的解决方案:

static void Queue_done(Queue *q) {
pthread_mutex_lock(&(q->mutex));
q->done = 1;
pthread_cond_signal(&(q->cond));
pthread_mutex_unlock(&(q->mutex));
}
// Returns the oldest item from the queue (via a parameter) and returns 1.
// If the queue is empty and done, returns 0.
// If the queue is empty and not done, waits until that changes.
static int Queue_dequeue(Queue *q, unsigned *i) {
pthread_mutex_lock(&(q->mutex));
while (q->empty && !q->done)
pthread_cond_wait(&(q->cond), &(q->mutex));
int dequeued;
if (q->empty) {
// We are completely done.
dequeued = 0;
} else {
*i = q->buf[ q->next_read ];
q->next_read = ( q->next_read + 1 ) % q->max;
q->empty = q->next_read == q->next_insert;
q->full = 0;
dequeued = 1;
}
pthread_cond_signal(&(q->cond));
pthread_mutex_unlock(&(q->mutex));
return dequeued;
}
// Adds the argument to the queue.
// If the queue is full, waits until that changes.
static void Queue_enqueue(Queue *q, unsigned i) {
pthread_mutex_lock(&(q->mutex));
while (q->full && !q->done)
pthread_cond_wait(&(q->cond), &(q->mutex));
if (q->done) {
fprintf(stderr, "Error: Attempted to add item to "done" queue.n");
} else {
q->buf[q->next_insert] = i;
q->next_insert = ( q->next_insert + 1 ) % q->max;
q->empty = 0;
q->full = q->next_insert == q->next_read;
}
pthread_cond_signal(&(q->cond));
pthread_mutex_unlock(&(q->mutex));
}

最新更新