我正在尝试创建一个多线程管道,将一个函数存储在多个线程中,并使用管道与每个线程和函数通信。当我运行程序时,它一遍又一遍地运行相同的函数,而不是单个函数,我认为我的管道有问题,但我不太确定我到底做错了什么?
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>
int num_ints;
typedef void (*Function)(void* input, void* output);
typedef struct Pipeline {
Function* functions;
int numStages;
} Pipeline;
Pipeline* new_Pipeline() {
Pipeline *this = malloc(sizeof(Pipeline));
this->functions = NULL;
this->numStages = 0;
printf("created the pipelinen");
return this;
}
bool Pipeline_add(Pipeline* this, Function f) {
//reallocating memory to add a stage to the functions array
this->functions = realloc(this->functions, (this->numStages +1) * sizeof(Function));
if (this->functions == NULL) {
return false;
}
else {
this->functions[this->numStages] = f;
this->numStages++;
printf("added a stagen");
return true;
}
}
typedef struct {
Function function;
int inputPipe;
int outputPipe;
} thread_args;
void* thread_func(void* arg) {
thread_args *data = (thread_args*) arg;
//get the input and output pipes from the args parameter
int inPipe = data->inputPipe;
int outPipe = data->outputPipe;
data->function((void*)&inPipe,(void*)&outPipe);
return NULL;
}
void Pipeline_execute(Pipeline* this) {
//create threads
pthread_t threads[this->numStages];
thread_args args[this->numStages];
for (int i = 0; i < this->numStages; i++) {
//creating the pipes
int fd[2];
pipe(fd);
//creating input and output pipes for each stage
args[i].function = this->functions[i];
args[i].inputPipe = fd[0];
args[i].outputPipe = fd[1];
if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
printf("created a threadn");
perror("pthread_createn");
}
if (i == this->numStages -1) {
close(fd[0]);
}
}
//waiting for threads to finish
for (int i = 0; i < this->numStages; i++) {
if (pthread_join(threads[i], NULL) != 0) {
perror("pthread_joinn");
exit(1);
}
}
//closing pipes
for (int i = 0; i < this->numStages -1; i++) {
}
}
void Pipeline_free(Pipeline* this) {
free(this->functions);
free(this);
}
bool Pipeline_send(void* channel, void* buffer, size_t size) {
if ((write(*(int*)channel, buffer, size)) != -1) {
return true;
} else {
return false;
}
}
bool Pipeline_receive(void* channel, void* buffer, size_t size) {
if ((read(*(int*)channel, buffer, size)) != -1) {
return true;
} else {
return false;
}
}
//an application created to help test the implementation of pipes.
static void generateInts(void* input, void* output) {
printf("generateInts: thread %pn", (void*) pthread_self());
for (int i = 1; i <= num_ints; i++) {
if (!Pipeline_send(output, (void*) &i, sizeof(int))) exit(EXIT_FAILURE);
}
}
static void squareInts(void* input, void* output) {
printf("squareInts: thread %pn", (void*) pthread_self());
for (int i = 1; i <= num_ints; i++) {
int number;
if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit(EXIT_FAILURE);
int result = number * number;
if (!Pipeline_send(output, (void*) &result, sizeof(int))) exit(EXIT_FAILURE);
}
}
static void sumIntsAndPrint(void* input, void* output) {
printf("sumIntsAndPrint: thread %pn", (void*) pthread_self());
int number = 0;
int result = 0;
for (int i = 1; i <= num_ints; i++) {
if (!Pipeline_receive(input, (void*) &number, sizeof(int))) exit (EXIT_FAILURE);
result += number;
}
printf("sumIntsAndPrint: result = %in", result);
}
static void cleanupExit(Pipeline *p) {
if (p) {
Pipeline_free(p);
}
exit(EXIT_FAILURE);
}
int main() {
scanf("%d", &num_ints);
printf("Setting up pipeline to calculate the sum of squares of integers 1 to %i.n", num_ints);
Pipeline *p = new_Pipeline();
if (p == NULL) cleanupExit(p);
if (!Pipeline_add(p, generateInts)) cleanupExit(p);
if (!Pipeline_add(p, squareInts)) cleanupExit(p);
if (!Pipeline_add(p, sumIntsAndPrint)) cleanupExit(p);
Pipeline_execute(p);
Pipeline_free(p);
return 0;
}
我期望它在不同的线程上运行每个函数,我使用另一段代码来调试它,它创建了三个不同的线程,但每个线程都是相同的函数。当我运行所提供的应用程序来检查管道实现时,它返回给我这个
Setting up pipeline to calculate the sum of squares of integers 1 to 10.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x5677640
squareInts: thread 0x5e78640
sumIntsAndPrint: thread 0x6679640
==162384==
==162384== HEAP SUMMARY:
==162384== in use at exit: 584 bytes in 4 blocks
==162384== total heap usage: 9 allocs, 5 frees, 10,096 bytes allocated
==162384==
==162384== LEAK SUMMARY:
==162384== definitely lost: 0 bytes in 0 blocks
==162384== indirectly lost: 0 bytes in 0 blocks
==162384== possibly lost: 544 bytes in 2 blocks
==162384== still reachable: 40 bytes in 2 blocks
我更新了代码
InPipeline_execute
…
-
您正在使用
pipe
呼叫中的fd
。 -
您将每个线程的输入和输出设置到相同的管道。因此,它写入自身(创建无限数据循环?)。
-
你必须"摇摇晃晃"将0级的输出输出到1级的输入(例如)的管道。
同时,你的线程函数必须在从目标函数返回后关闭管道单元。
这里是重构的[working?)的代码。特别要注意Pipeline_execute
中的变化。此外,thread_func
.
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdbool.h>
#define CLOSEME(_fd)
do {
if (_fd < 0)
break;
close(_fd);
_fd = -1;
} while (0)
int num_ints;
typedef void (*Function) (void *input, void *output);
typedef struct Pipeline {
Function *functions;
int numStages;
} Pipeline;
Pipeline *
new_Pipeline()
{
Pipeline *this = malloc(sizeof(Pipeline));
this->functions = NULL;
this->numStages = 0;
printf("created the pipelinen");
return this;
}
bool
Pipeline_add(Pipeline *this, Function f)
{
// reallocating memory to add a stage to the functions array
this->functions = realloc(this->functions, (this->numStages + 1) * sizeof(Function));
if (this->functions == NULL) {
return false;
}
else {
this->functions[this->numStages] = f;
this->numStages++;
printf("added a stagen");
return true;
}
}
typedef struct {
Function function;
int inputPipe;
int outputPipe;
} thread_args;
void *
thread_func(void *arg)
{
thread_args *data = (thread_args *) arg;
// get the input and output pipes from the args parameter
int inPipe = data->inputPipe;
int outPipe = data->outputPipe;
data->function((void *) &inPipe, (void *) &outPipe);
CLOSEME(data->inputPipe);
CLOSEME(data->outputPipe);
return NULL;
}
void
Pipeline_execute(Pipeline *this)
{
// create threads
pthread_t threads[this->numStages];
thread_args args[this->numStages];
int fd[2] = { -1, -1 };
for (int i = 0; i < this->numStages; i++) {
// use input side from _prior_ stage for our input
args[i].inputPipe = fd[0];
// last stage has _no_ output
if (i == this->numStages - 1) {
fd[0] = -1;
fd[1] = -1;
}
else
pipe(fd);
// set output for _this_ stage from the pipe output
args[i].outputPipe = fd[1];
// creating input and output pipes for each stage
args[i].function = this->functions[i];
if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
printf("created a threadn");
perror("pthread_createn");
}
}
// waiting for threads to finish
for (int i = 0; i < this->numStages; i++) {
if (pthread_join(threads[i], NULL) != 0) {
perror("pthread_joinn");
exit(1);
}
}
// closing pipes
// Now done in thread_func
for (int i = 0; i < this->numStages - 1; i++) {
}
}
void
Pipeline_free(Pipeline *this)
{
free(this->functions);
free(this);
}
bool
Pipeline_send(void *channel, void *buffer, size_t size)
{
if ((write(*(int *) channel, buffer, size)) != -1) {
return true;
}
else {
return false;
}
}
bool
Pipeline_receive(void *channel, void *buffer, size_t size)
{
if ((read(*(int *) channel, buffer, size)) != -1) {
return true;
}
else {
return false;
}
}
//an application created to help test the implementation of pipes.
static void
generateInts(void *input, void *output)
{
printf("generateInts: thread %pn", (void *) pthread_self());
for (int i = 1; i <= num_ints; i++) {
if (!Pipeline_send(output, (void *) &i, sizeof(int)))
exit(EXIT_FAILURE);
}
}
static void
squareInts(void *input, void *output)
{
printf("squareInts: thread %pn", (void *) pthread_self());
for (int i = 1; i <= num_ints; i++) {
int number;
if (!Pipeline_receive(input, (void *) &number, sizeof(int)))
exit(EXIT_FAILURE);
int result = number * number;
if (!Pipeline_send(output, (void *) &result, sizeof(int)))
exit(EXIT_FAILURE);
}
}
static void
sumIntsAndPrint(void *input, void *output)
{
printf("sumIntsAndPrint: thread %pn", (void *) pthread_self());
int number = 0;
int result = 0;
for (int i = 1; i <= num_ints; i++) {
if (!Pipeline_receive(input, (void *) &number, sizeof(int)))
exit(EXIT_FAILURE);
result += number;
}
printf("sumIntsAndPrint: result = %in", result);
}
static void
cleanupExit(Pipeline * p)
{
if (p) {
Pipeline_free(p);
}
exit(EXIT_FAILURE);
}
int
main()
{
printf("Enter number of ints:n");
scanf("%d", &num_ints);
printf("Setting up pipeline to calculate the sum of squares of integers 1 to %i.n", num_ints);
Pipeline *p = new_Pipeline();
if (p == NULL)
cleanupExit(p);
if (!Pipeline_add(p, generateInts))
cleanupExit(p);
if (!Pipeline_add(p, squareInts))
cleanupExit(p);
if (!Pipeline_add(p, sumIntsAndPrint))
cleanupExit(p);
Pipeline_execute(p);
Pipeline_free(p);
return 0;
}
程序输出:
Enter number of ints:
7
Setting up pipeline to calculate the sum of squares of integers 1 to 7.
created the pipeline
added a stage
added a stage
added a stage
generateInts: thread 0x7fc5c10a4700
squareInts: thread 0x7fc5c08a3700
sumIntsAndPrint: thread 0x7fc5c00a2700
sumIntsAndPrint: result = 140
这是基于我的回答:fd泄漏,自定义Shell它使用fork
而不是pthread_create
,但问题是相似的。
你正在把这个传递给你的每个线程:
thread_args args = {
.function = this->functions[i],
.inputPipe = fd[0],
.outputPipe = fd[1],
};
存在于堆栈中,并且不会在定义它的循环之外持续存在。它可能在线程运行时已经不存在了,或者多个线程可能最终读取相同的值。您几乎可以肯定地将相同的地址传递给所有线程。
你所做的相当于:
int a;
a = 1;
pthread_create(..., &a);
a = 2;
pthread_create(..., &a);
但是不能保证这些线程何时会运行。
您需要为每个线程分配结构,该结构与线程的持续时间一样长。最简单的可能是定义:
thread_args args[this->num_stages];
在Pipeline_execute()
的顶部,并填充。
。你想要这样的:
thread_args args[this->num_stages];
for (int i = 0; i < this->numStages; i++) {
//creating the pipes
int fd[2];
pipe(fd);
//creating input and output pipes for each stage
args[i].function = this->functions[i],
args[i].inputPipe = fd[0],
args[i].outputPipe = fd[1],
if (pthread_create(&threads[i], NULL, thread_func, &args[i]) != 0) {
printf("created a threadn");
perror("pthread_createn");
}
...
}