我应该同步两个线程(此时使用这两个线程的两个信号量来完成(。第一个线程应该读取 4 字节(4 个字符(缓冲区块的文本文件并将其复制到共享内存。另一个应该从共享内存中读取并将其写入另一个文件。我的程序在仅完成 16 个字符的进程后停止(因为它无法访问共享内存(。你能帮我修复他的吗?
请注意,im 在 Mac 中编码,这就是为什么信号量与默认 POSIX 略有不同。
提前致谢:)
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <time.h>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>
#include <fcntl.h>
#define ANSI_COLOR_RED "x1b[31m"
#define ANSI_COLOR_GREEN "x1b[32m"
#define ANSI_COLOR_BLUE "x1b[34m"
//#define ANSI_COLOR_YELLOW "x1b[33m"
//#define ANSI_COLOR_MAGENTA "x1b[35m"
//#define ANSI_COLOR_CYAN "x1b[36m"
#define ANSI_COLOR_RESET "x1b[0m"
typedef sem_t Semaphore;
Semaphore * semList[2];
Semaphore *schedularSemaphore;
/*
* SEMAPHORE USAGE
*/
Semaphore *make_semaphore(int value){
Semaphore *semaphore = (Semaphore *) malloc(sizeof(Semaphore));
semaphore = sem_open("/semaphore", O_CREAT, 0644, value);
sem_unlink("/semaphore");
return semaphore;
}
void semaphore_wait(Semaphore *semaphore){
sem_wait(semaphore);
}
void semaphore_signal(Semaphore *semaphore){
sem_post(semaphore);
}
/*
* SEMAPHORE USAGE
*/
#define NOT_READY -1
#define FILLED 0
#define TAKEN 1
struct Memory {
int status;
char *data;
};
int fdIN, fdOUT; /* Input and output file descriptors */
int BUF_SIZE = 4;
ssize_t ret_in, ret_out; /* Number of bytes returned by read() and write() */
char buffer[4]; /* Character buffer */
Semaphore * writerSem;
Semaphore * readerSem;
int readFileWriteToShMem(){
key_t ShmKEY;
int ShmID;
struct Memory *ShmPTR;
int doRead = 1;
while (doRead == 1) {
fprintf(stdout, ANSI_COLOR_BLUE "HERE1" ANSI_COLOR_RESET "n");
ShmKEY = ftok(".", 'x');
ShmID = shmget(ShmKEY, sizeof(struct Memory), IPC_CREAT | 0666);
if (ShmID < 0) {
fprintf(stdout, "*** shmget error (reader) ***n");
exit(1);
}
fprintf(stdout, ANSI_COLOR_BLUE "HERE2" ANSI_COLOR_RESET "n");
fprintf(stdout, ANSI_COLOR_BLUE "HERE3" ANSI_COLOR_RESET "n");
semaphore_wait(readerSem);
ShmPTR = (struct Memory *) shmat(ShmID, NULL, 0);
if ((int) ShmPTR == -1) {
fprintf(stdout, ANSI_COLOR_RED "*** shmat error (reader) ***" ANSI_COLOR_RESET "n");
exit(1);
}
fprintf(stdout, ANSI_COLOR_BLUE "HERE4" ANSI_COLOR_RESET "n");
if((ret_in = read (fdIN, &buffer, BUF_SIZE)) > 0){
fprintf(stdout, ANSI_COLOR_BLUE "HERE5" ANSI_COLOR_RESET "n");
fprintf(stdout, ANSI_COLOR_BLUE "Reader has received a shared memory of 4 bytes..." ANSI_COLOR_RESET "n");
fprintf(stdout, ANSI_COLOR_BLUE "Reader has attached the shared memory..." ANSI_COLOR_RESET "n");
ShmPTR->status = NOT_READY;
ShmPTR->data = buffer;
fprintf(stdout, ANSI_COLOR_BLUE "Reader has filled "%s" to shared memory..." ANSI_COLOR_RESET "n",
ShmPTR->data);
ShmPTR->status = FILLED;
semaphore_signal(writerSem);
while (ShmPTR->status != TAKEN){
usleep(1);
}
} else {
fprintf(stdout, ANSI_COLOR_BLUE "HERE6" ANSI_COLOR_RESET "n");
doRead = 0;
}
}
fprintf(stdout, ANSI_COLOR_BLUE "Reader has detected the completion of its task..." ANSI_COLOR_RESET "n");
shmdt((void *) ShmPTR);
fprintf(stdout, ANSI_COLOR_BLUE "Reader has detached its shared memory..." ANSI_COLOR_RESET "n");
shmctl(ShmID, IPC_RMID, NULL);
fprintf(stdout, ANSI_COLOR_BLUE "Reader has removed its shared memory..." ANSI_COLOR_RESET "n");
fprintf(stdout, ANSI_COLOR_BLUE "Reader exits...n");
exit(0);
}
int writeFileReadFromShMem(){
key_t ShmKEY;
int ShmID;
struct Memory *ShmPTR;
int doWrite = 1;
while (doWrite){
ShmKEY = ftok(".", 'x');
ShmID = shmget(ShmKEY, sizeof(struct Memory), 0666);
if (ShmID < 0) {
fprintf(stdout, ANSI_COLOR_GREEN "*** shmget error (writer) ***" ANSI_COLOR_RESET "n");
exit(1);
}
semaphore_wait(writerSem);
fprintf(stdout, ANSI_COLOR_GREEN "Writer has received a shared memory of four bytes..." ANSI_COLOR_RESET "n");
ShmPTR = (struct Memory *) shmat(ShmID, NULL, 0);
if ((int) ShmPTR == -1) {
fprintf(stdout, ANSI_COLOR_GREEN "*** shmat error (writer) ***" ANSI_COLOR_RESET "n");
exit(1);
}
fprintf(stdout, ANSI_COLOR_GREEN "Writer has attached the shared memory..." ANSI_COLOR_RESET "n");
while (ShmPTR->status != FILLED)
;
ret_out = write (fdOUT, &buffer, (ssize_t) ret_in);
write(fdOUT, "n", 1);
if(ret_out != ret_in){
///Write error
fprintf(stdout, ANSI_COLOR_RED "write error" ANSI_COLOR_RESET "n");
doWrite = 0;
}
fprintf(stdout, ANSI_COLOR_GREEN "Writer found the data is ready..." ANSI_COLOR_RESET "n");
fprintf(stdout, ANSI_COLOR_GREEN "Writer found "%s" in shared memory..." ANSI_COLOR_RESET "n",
ShmPTR->data);
ShmPTR->status = TAKEN;
fprintf(stdout, ANSI_COLOR_GREEN "Writer has informed Reader data have been taken..." ANSI_COLOR_RESET "n");
semaphore_signal(readerSem);
}
shmdt((void *) ShmPTR);
fprintf(stdout, ANSI_COLOR_GREEN "Writer has detached its shared memory..." ANSI_COLOR_RESET "n");
fprintf(stdout, ANSI_COLOR_GREEN "Writer exits..." ANSI_COLOR_RESET "n");
exit(0);
}
int main(int argc, char* argv[]){
fdIN = open("textin.txt", O_RDONLY, 0644);
fdOUT = open("textout.txt", O_WRONLY | O_CREAT, 0644);
if (fdIN == -1) {
perror ("error while opening input file");
return 2;
}
if(fdOUT == -1){
perror ("error while opening output file");
return 3;
}
pthread_t *readerThread, *writerThread;
readerThread = (pthread_t *)malloc(sizeof(*readerThread));
writerThread = (pthread_t *)malloc(sizeof(*writerThread));
readerSem = make_semaphore(1);
writerSem = make_semaphore(0);
pthread_create(readerThread, NULL, (void*)readFileWriteToShMem, NULL);
pthread_create(writerThread, NULL, (void*)writeFileReadFromShMem, NULL);
pthread_join(*readerThread, NULL);
pthread_join(*writerThread, NULL);
semaphore_signal(readerSem);
return 1;
}
这是最终的工作代码。 无需在每次迭代时初始化和访问共享内存。
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <time.h>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>
#include <fcntl.h>
#define ANSI_COLOR_RED "x1b[31m"
#define ANSI_COLOR_GREEN "x1b[32m"
#define ANSI_COLOR_BLUE "x1b[34m"
//#define ANSI_COLOR_YELLOW "x1b[33m"
//#define ANSI_COLOR_MAGENTA "x1b[35m"
//#define ANSI_COLOR_CYAN "x1b[36m"
#define ANSI_COLOR_RESET "x1b[0m"
typedef sem_t Semaphore;
/*
* SEMAPHORE USAGE
*/
Semaphore *make_semaphore(int value){
Semaphore *semaphore = (Semaphore *) malloc(sizeof(Semaphore));
semaphore = sem_open("/semaphore", O_CREAT, 0644, value);
sem_unlink("/semaphore");
return semaphore;
}
void semaphore_wait(Semaphore *semaphore){
sem_wait(semaphore);
}
void semaphore_signal(Semaphore *semaphore){
sem_post(semaphore);
}
/*
* SEMAPHORE USAGE
*/
#define NOT_READY -1
#define FILLED 0
#define TAKEN 1
struct Memory {
int status;
char *data;
};
int fdIN, fdOUT; /* Input and output file descriptors */
int BUF_SIZE = 4;
ssize_t ret_in, ret_out; /* Number of bytes returned by read() and write() */
char buffer[4]; /* Character buffer */
Semaphore * writerSem, *readerSem;
key_t ShmKEY;
int ShmID;
struct Memory *ShmPTR;
int readFileWriteToShMem(){
ShmKEY = ftok(".", 'x');
ShmID = shmget(ShmKEY, sizeof(struct Memory), IPC_CREAT | 0666);
ShmPTR = (struct Memory *) shmat(ShmID, NULL, 0);
int doRead = 1;
while (doRead == 1) {
if (ShmID < 0) {
fprintf(stdout, "*** shmget error (reader) ***n");
exit(1);
}
semaphore_wait(readerSem);
if ((int) ShmPTR == -1) {
fprintf(stdout, ANSI_COLOR_RED "*** shmat error (reader) ***" ANSI_COLOR_RESET "n");
exit(1);
}
if((ret_in = read (fdIN, &buffer, BUF_SIZE)) > 0){
fprintf(stdout, ANSI_COLOR_BLUE "Reader has attached the shared memory and received 4 bytes..." ANSI_COLOR_RESET "n");
ShmPTR->status = NOT_READY;
ShmPTR->data = buffer;
fprintf(stdout, ANSI_COLOR_BLUE "Reader has filled "%s" to shared memory..." ANSI_COLOR_RESET "n", ShmPTR->data);
ShmPTR->status = FILLED;
semaphore_signal(writerSem);
while (ShmPTR->status != TAKEN){
usleep(1);
fprintf(stdout, ANSI_COLOR_BLUE "Reader is sleeping..." ANSI_COLOR_RESET "n");
}
} else {
doRead = 0;
}
}
fprintf(stdout, ANSI_COLOR_BLUE "Reader has detected the completion of its task..." ANSI_COLOR_RESET "n");
shmdt((void *) ShmPTR);
fprintf(stdout, ANSI_COLOR_BLUE "eader has detected the completion of its task and detached its shared memory..." ANSI_COLOR_RESET "n");
shmctl(ShmID, IPC_RMID, NULL);
fprintf(stdout, ANSI_COLOR_BLUE "Reader has removed its shared memory and now halts..." ANSI_COLOR_RESET "n");
exit(0);
}
int writeFileReadFromShMem(){
int doWrite = 1;
while (doWrite){
if (ShmID < 0) {
fprintf(stdout, ANSI_COLOR_GREEN "*** shmget error (writer) ***" ANSI_COLOR_RESET "n");
exit(1);
}
semaphore_wait(writerSem);
if ((int) ShmPTR == -1) {
fprintf(stdout, ANSI_COLOR_GREEN "*** shmat error (writer) ***" ANSI_COLOR_RESET "n");
exit(1);
}
fprintf(stdout, ANSI_COLOR_GREEN "Writer has attached the shared memory and received 4 bytes..." ANSI_COLOR_RESET "n");
while (ShmPTR->status != FILLED);
ret_out = write (fdOUT, &buffer, (ssize_t) ret_in);
write(fdOUT, "n", 1);
if(ret_out != ret_in){
///Write error
fprintf(stdout, ANSI_COLOR_RED "write error" ANSI_COLOR_RESET "n");
doWrite = 0;
}
fprintf(stdout, ANSI_COLOR_GREEN "Writer found "%s" in shared memory..." ANSI_COLOR_RESET "n",
ShmPTR->data);
ShmPTR->status = TAKEN;
fprintf(stdout, ANSI_COLOR_GREEN "Writer has informed Reader data have been taken..." ANSI_COLOR_RESET "n");
semaphore_signal(readerSem);
}
shmdt((void *) ShmPTR);
fprintf(stdout, ANSI_COLOR_GREEN "Writer has detached its shared memory and now halts..." ANSI_COLOR_RESET "n");
exit(0);
}
int main(int argc, char* argv[]){
fdIN = open("textin.txt", O_RDONLY, 0644);
fdOUT = open("textout.txt", O_WRONLY | O_CREAT, 0644);
if (fdIN == -1) {
perror ("error while opening input file");
return 2;
}
if(fdOUT == -1){
perror ("error while opening output file");
return 3;
}
pthread_t *readerThread, *writerThread;
readerThread = (pthread_t *)malloc(sizeof(*readerThread));
writerThread = (pthread_t *)malloc(sizeof(*writerThread));
readerSem = make_semaphore(1);
writerSem = make_semaphore(0);
pthread_create(readerThread, NULL, (void*)readFileWriteToShMem, NULL);
pthread_create(writerThread, NULL, (void*)writeFileReadFromShMem, NULL);
pthread_join(*readerThread, NULL);
pthread_join(*writerThread, NULL);
semaphore_signal(readerSem);
return 1;
}