使用共享内存进行读取器和编写器线程的通信



我应该同步两个线程(此时使用这两个线程的两个信号量来完成(。第一个线程应该读取 4 字节(4 个字符(缓冲区块的文本文件并将其复制到共享内存。另一个应该从共享内存中读取并将其写入另一个文件。我的程序在仅完成 16 个字符的进程后停止(因为它无法访问共享内存(。你能帮我修复他的吗?

请注意,im 在 Mac 中编码,这就是为什么信号量与默认 POSIX 略有不同。

提前致谢:)

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <time.h>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>
#include <fcntl.h>
#define ANSI_COLOR_RED     "x1b[31m"
#define ANSI_COLOR_GREEN   "x1b[32m"
#define ANSI_COLOR_BLUE    "x1b[34m"
//#define ANSI_COLOR_YELLOW  "x1b[33m"
//#define ANSI_COLOR_MAGENTA "x1b[35m"
//#define ANSI_COLOR_CYAN    "x1b[36m"
#define ANSI_COLOR_RESET   "x1b[0m"

typedef sem_t Semaphore;
Semaphore * semList[2];
Semaphore *schedularSemaphore;

/*
* SEMAPHORE USAGE
*/
Semaphore *make_semaphore(int value){
    Semaphore *semaphore = (Semaphore *) malloc(sizeof(Semaphore));
    semaphore = sem_open("/semaphore", O_CREAT, 0644, value);
    sem_unlink("/semaphore");
    return semaphore;
}
void semaphore_wait(Semaphore *semaphore){
    sem_wait(semaphore);
}
void semaphore_signal(Semaphore *semaphore){
    sem_post(semaphore);
}
/*
* SEMAPHORE USAGE
*/
#define  NOT_READY  -1
#define  FILLED     0
#define  TAKEN      1
struct Memory {
     int  status;
     char  *data;
};
int fdIN, fdOUT;            /* Input and output file descriptors */
int BUF_SIZE = 4;   
ssize_t ret_in, ret_out;    /* Number of bytes returned by read() and write() */
char buffer[4];             /* Character buffer */

Semaphore * writerSem;
Semaphore * readerSem;
int readFileWriteToShMem(){
    key_t          ShmKEY;
    int            ShmID;
    struct Memory  *ShmPTR;

    int doRead = 1;
    while (doRead == 1) {

        fprintf(stdout, ANSI_COLOR_BLUE "HERE1" ANSI_COLOR_RESET "n");
        ShmKEY = ftok(".", 'x');
        ShmID = shmget(ShmKEY, sizeof(struct Memory), IPC_CREAT | 0666);
        if (ShmID < 0) {
            fprintf(stdout, "*** shmget error (reader) ***n");
            exit(1);
        }
        fprintf(stdout, ANSI_COLOR_BLUE "HERE2" ANSI_COLOR_RESET "n");

        fprintf(stdout, ANSI_COLOR_BLUE "HERE3" ANSI_COLOR_RESET "n");
        semaphore_wait(readerSem);

        ShmPTR = (struct Memory *) shmat(ShmID, NULL, 0);
        if ((int) ShmPTR == -1) {
            fprintf(stdout, ANSI_COLOR_RED "*** shmat error (reader) ***" ANSI_COLOR_RESET "n");
            exit(1);
        }
        fprintf(stdout, ANSI_COLOR_BLUE "HERE4" ANSI_COLOR_RESET "n");
        if((ret_in = read (fdIN, &buffer, BUF_SIZE)) > 0){
            fprintf(stdout, ANSI_COLOR_BLUE "HERE5" ANSI_COLOR_RESET "n");
            fprintf(stdout, ANSI_COLOR_BLUE "Reader has received a shared memory of 4 bytes..." ANSI_COLOR_RESET "n");
            fprintf(stdout, ANSI_COLOR_BLUE "Reader has attached the shared memory..." ANSI_COLOR_RESET "n");
            ShmPTR->status = NOT_READY;
            ShmPTR->data = buffer;
            fprintf(stdout, ANSI_COLOR_BLUE "Reader has filled  "%s" to shared memory..." ANSI_COLOR_RESET "n",
            ShmPTR->data);
            ShmPTR->status = FILLED;
            semaphore_signal(writerSem);
            while (ShmPTR->status != TAKEN){
                usleep(1);
            }

        } else {
            fprintf(stdout, ANSI_COLOR_BLUE "HERE6" ANSI_COLOR_RESET "n");
            doRead = 0;
        }
    }

    fprintf(stdout, ANSI_COLOR_BLUE "Reader has detected the completion of its task..." ANSI_COLOR_RESET "n");
    shmdt((void *) ShmPTR);
    fprintf(stdout, ANSI_COLOR_BLUE "Reader has detached its shared memory..." ANSI_COLOR_RESET "n");
    shmctl(ShmID, IPC_RMID, NULL);
    fprintf(stdout, ANSI_COLOR_BLUE "Reader has removed its shared memory..." ANSI_COLOR_RESET "n");
    fprintf(stdout, ANSI_COLOR_BLUE "Reader exits...n");
    exit(0);
}

int writeFileReadFromShMem(){
    key_t          ShmKEY;
    int            ShmID;
    struct Memory  *ShmPTR;

    int doWrite = 1;
    while (doWrite){
        ShmKEY = ftok(".", 'x');
        ShmID = shmget(ShmKEY, sizeof(struct Memory), 0666);
        if (ShmID < 0) {
            fprintf(stdout, ANSI_COLOR_GREEN "*** shmget error (writer) ***" ANSI_COLOR_RESET "n");
            exit(1);
        }
        semaphore_wait(writerSem);
        fprintf(stdout, ANSI_COLOR_GREEN "Writer has received a shared memory of four bytes..." ANSI_COLOR_RESET "n");
        ShmPTR = (struct Memory *) shmat(ShmID, NULL, 0);
        if ((int) ShmPTR == -1) {
            fprintf(stdout, ANSI_COLOR_GREEN "*** shmat error (writer) ***" ANSI_COLOR_RESET "n");
            exit(1);
        }
        fprintf(stdout, ANSI_COLOR_GREEN "Writer has attached the shared memory..." ANSI_COLOR_RESET "n");
        while (ShmPTR->status != FILLED)
        ;
        ret_out = write (fdOUT, &buffer, (ssize_t) ret_in);
        write(fdOUT, "n", 1);
        if(ret_out != ret_in){
            ///Write error 
            fprintf(stdout, ANSI_COLOR_RED "write error" ANSI_COLOR_RESET "n");
            doWrite = 0;
        }
        fprintf(stdout, ANSI_COLOR_GREEN "Writer found the data is ready..." ANSI_COLOR_RESET "n");
        fprintf(stdout, ANSI_COLOR_GREEN "Writer found "%s" in shared memory..." ANSI_COLOR_RESET "n",
            ShmPTR->data);
        ShmPTR->status = TAKEN;
        fprintf(stdout, ANSI_COLOR_GREEN "Writer has informed Reader data have been taken..." ANSI_COLOR_RESET "n");
        semaphore_signal(readerSem);
    }
    shmdt((void *) ShmPTR);
    fprintf(stdout, ANSI_COLOR_GREEN "Writer has detached its shared memory..." ANSI_COLOR_RESET "n");
    fprintf(stdout, ANSI_COLOR_GREEN "Writer exits..." ANSI_COLOR_RESET "n");
    exit(0);
}

int main(int argc, char* argv[]){

    fdIN = open("textin.txt", O_RDONLY, 0644);
    fdOUT = open("textout.txt", O_WRONLY | O_CREAT, 0644);

    if (fdIN == -1) {
        perror ("error while opening input file");
        return 2;
    }
    if(fdOUT == -1){
        perror ("error while opening output file");
        return 3;
    }
    pthread_t *readerThread, *writerThread;
    readerThread    = (pthread_t *)malloc(sizeof(*readerThread));
    writerThread    = (pthread_t *)malloc(sizeof(*writerThread));
    readerSem = make_semaphore(1);
    writerSem = make_semaphore(0);
    pthread_create(readerThread, NULL, (void*)readFileWriteToShMem, NULL);
    pthread_create(writerThread, NULL, (void*)writeFileReadFromShMem, NULL);

    pthread_join(*readerThread, NULL);
    pthread_join(*writerThread, NULL);
    semaphore_signal(readerSem);
    return 1;
}

这是最终的工作代码。 无需在每次迭代时初始化和访问共享内存。

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <time.h>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>
#include <fcntl.h>
#define ANSI_COLOR_RED     "x1b[31m"
#define ANSI_COLOR_GREEN   "x1b[32m"
#define ANSI_COLOR_BLUE    "x1b[34m"
//#define ANSI_COLOR_YELLOW  "x1b[33m"
//#define ANSI_COLOR_MAGENTA "x1b[35m"
//#define ANSI_COLOR_CYAN    "x1b[36m"
#define ANSI_COLOR_RESET   "x1b[0m"
typedef sem_t Semaphore;
/*
* SEMAPHORE USAGE
*/
Semaphore *make_semaphore(int value){
    Semaphore *semaphore = (Semaphore *) malloc(sizeof(Semaphore));
    semaphore = sem_open("/semaphore", O_CREAT, 0644, value);
    sem_unlink("/semaphore");
    return semaphore;
}
void semaphore_wait(Semaphore *semaphore){
    sem_wait(semaphore);
}
void semaphore_signal(Semaphore *semaphore){
    sem_post(semaphore);
}
/*
* SEMAPHORE USAGE
*/
#define  NOT_READY  -1
#define  FILLED     0
#define  TAKEN      1
struct Memory {
     int  status;
     char  *data;
};
int fdIN, fdOUT;            /* Input and output file descriptors */
int BUF_SIZE = 4;   
ssize_t ret_in, ret_out;    /* Number of bytes returned by read() and write() */
char buffer[4];             /* Character buffer */
Semaphore * writerSem, *readerSem;
key_t          ShmKEY;
int            ShmID;
struct Memory  *ShmPTR;
int readFileWriteToShMem(){
    ShmKEY = ftok(".", 'x');
    ShmID = shmget(ShmKEY, sizeof(struct Memory), IPC_CREAT | 0666);
    ShmPTR = (struct Memory *) shmat(ShmID, NULL, 0);
    int doRead = 1;
    while (doRead == 1) {
        if (ShmID < 0) {
            fprintf(stdout, "*** shmget error (reader) ***n");
            exit(1);
        }
        semaphore_wait(readerSem);
        if ((int) ShmPTR == -1) {
            fprintf(stdout, ANSI_COLOR_RED "*** shmat error (reader) ***" ANSI_COLOR_RESET "n");
            exit(1);
        }
        if((ret_in = read (fdIN, &buffer, BUF_SIZE)) > 0){
            fprintf(stdout, ANSI_COLOR_BLUE "Reader has attached the shared memory and received 4 bytes..." ANSI_COLOR_RESET "n");
            ShmPTR->status = NOT_READY;
            ShmPTR->data = buffer;
            fprintf(stdout, ANSI_COLOR_BLUE "Reader has filled  "%s" to shared memory..." ANSI_COLOR_RESET "n", ShmPTR->data);
            ShmPTR->status = FILLED;
            semaphore_signal(writerSem);
            while (ShmPTR->status != TAKEN){
                usleep(1);
                fprintf(stdout, ANSI_COLOR_BLUE "Reader is sleeping..." ANSI_COLOR_RESET "n");
            }
        } else {
            doRead = 0;
        }
    }
    fprintf(stdout, ANSI_COLOR_BLUE "Reader has detected the completion of its task..." ANSI_COLOR_RESET "n");
    shmdt((void *) ShmPTR);
    fprintf(stdout, ANSI_COLOR_BLUE "eader has detected the completion of its task and detached its shared memory..." ANSI_COLOR_RESET "n");
    shmctl(ShmID, IPC_RMID, NULL);
    fprintf(stdout, ANSI_COLOR_BLUE "Reader has removed its shared memory and now halts..." ANSI_COLOR_RESET "n");
    exit(0);
}
int writeFileReadFromShMem(){
    int doWrite = 1;
    while (doWrite){
        if (ShmID < 0) {
            fprintf(stdout, ANSI_COLOR_GREEN "*** shmget error (writer) ***" ANSI_COLOR_RESET "n");
            exit(1);
        }
        semaphore_wait(writerSem);
        if ((int) ShmPTR == -1) {
            fprintf(stdout, ANSI_COLOR_GREEN "*** shmat error (writer) ***" ANSI_COLOR_RESET "n");
            exit(1);
        }
        fprintf(stdout, ANSI_COLOR_GREEN "Writer has attached the shared memory and received 4 bytes..." ANSI_COLOR_RESET "n");
        while (ShmPTR->status != FILLED);
        ret_out = write (fdOUT, &buffer, (ssize_t) ret_in);
        write(fdOUT, "n", 1);
        if(ret_out != ret_in){
            ///Write error 
            fprintf(stdout, ANSI_COLOR_RED "write error" ANSI_COLOR_RESET "n");
            doWrite = 0;
        }
        fprintf(stdout, ANSI_COLOR_GREEN "Writer found "%s" in shared memory..." ANSI_COLOR_RESET "n",
            ShmPTR->data);
        ShmPTR->status = TAKEN;
        fprintf(stdout, ANSI_COLOR_GREEN "Writer has informed Reader data have been taken..." ANSI_COLOR_RESET "n");
        semaphore_signal(readerSem);
    }
    shmdt((void *) ShmPTR);
    fprintf(stdout, ANSI_COLOR_GREEN "Writer has detached its shared memory and now halts..." ANSI_COLOR_RESET "n");
    exit(0);
}
int main(int argc, char* argv[]){

    fdIN = open("textin.txt", O_RDONLY, 0644);
    fdOUT = open("textout.txt", O_WRONLY | O_CREAT, 0644);

    if (fdIN == -1) {
        perror ("error while opening input file");
        return 2;
    }
    if(fdOUT == -1){
        perror ("error while opening output file");
        return 3;
    }
    pthread_t *readerThread, *writerThread;
    readerThread    = (pthread_t *)malloc(sizeof(*readerThread));
    writerThread    = (pthread_t *)malloc(sizeof(*writerThread));
    readerSem = make_semaphore(1);
    writerSem = make_semaphore(0);
    pthread_create(readerThread, NULL, (void*)readFileWriteToShMem, NULL);
    pthread_create(writerThread, NULL, (void*)writeFileReadFromShMem, NULL);
    pthread_join(*readerThread, NULL);
    pthread_join(*writerThread, NULL);
    semaphore_signal(readerSem);
    return 1;
}

最新更新