c-如何从不同的线程退出对线程的recv()的阻塞调用



我有一个运行两个线程的代码,

第一线程使用recv()sender上等待数据,然后使用send将数据转发到receiver

第二线程使用recv()receiver上等待数据,然后使用send将数据转发到sender

重要的是这两者要并行工作。

假设发送方断开连接,第一个线程检测到这一点并关闭连接。

如何告诉仍在等待接收器的第二个线程连接已关闭,不需要进一步通信?

CCD_ 9已经用CCD_。

CCD_ 11已经用CCD_。

sender_fd是发送方的套接字文件描述符。

reciever_fd是接收器的套接字文件描述符。

void* sender_to_reciever(){
int t1;packet* p = malloc(sizeof(packet));
while((t1=recieve_packet(sender_fd,&p))!=0){
send_packet(reciever_fd,p);
}
close(sender_fd);
}

void* reciever_to_sender(){
int t1;packet* p = malloc(sizeof(packet));
while((t1=recieve_packet(reciever_fd,&p))!=0){
send_packet(sender_fd,p);
}
close(reciever_fd);
}

我不想更改send_packetrecieve_packet函数调用的实现。

如果while循环退出,我尝试关闭sender_fdreciever_d。然而,这并没有奏效。

处理发送方和接收方的信道.c的代码:-

#include "packets.c"

#define SERVER_PORT "8642"
#define QUEUE_LENGTH 10
void handle_connection(int);
void* sender_to_receiver();
void* receiver_to_sender();
int open_outgoing_connection(char*, char*);
int sender_fd;
int receiver_fd;
int main(int argc, char* argv[]){
int sock_fd,new_fd,rv,yes;yes=1;
struct addrinfo hints,*res;
struct sockaddr_storage client_addr;
socklen_t addr_size;
char client_details[INET6_ADDRSTRLEN];

struct sigaction sa;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
if((rv=getaddrinfo(NULL, SERVER_PORT, &hints, &res))!=0){
printf("Error getaddrinfo : %sn",gai_strerror(rv));
return 1;
}

if((sock_fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol))==-1){
printf("Error socket file descriptorn");
return 1;
}
if(setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1){
printf("Error setsockoptn");
return 1;
}
if(bind(sock_fd,res->ai_addr,res->ai_addrlen)==-1){
close(sock_fd);
printf("Error bindn");
return 1;
}
if(listen(sock_fd, QUEUE_LENGTH)==-1){
printf("Error listenn");
return 1;
}
freeaddrinfo(res);
sa.sa_handler = sigchld_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
if(sigaction(SIGCHLD,&sa,NULL) == -1){
printf("Error sigactionn");
return 1;
}
// Now we have prepared the socket(ip+port) for accepting incoming connections.
printf("Server PID : %dn",getpid());
while(1){
addr_size = sizeof client_addr;
new_fd = accept(sock_fd,(struct sockaddr*)&client_addr, &addr_size);
if(new_fd==-1){
printf("Error Accepting Request %dn",getpid());
return 1;
}
inet_ntop(client_addr.ss_family,get_in_addr((struct sockaddr*)&client_addr),client_details,sizeof client_details);
if(!fork()){ // Child Process
close(sock_fd);
printf("Connection Accepted From : %s by PID:%dn",client_details,getpid());
handle_connection(new_fd);
exit(0);
}
}
return 1;
}

void handle_connection(int socket_sender){
packet* p = malloc(sizeof(packet));
int t1;
if((t1=receive_packet(socket_sender, &p))==0){
printf("Closed Connectionn");
}else{
int socket_receiver;
if((socket_receiver=open_outgoing_connection(p->destination_ip,p->destination_port))!=-1){      
sender_fd = socket_sender;
receiver_fd = socket_receiver;
pthread_t str,rts;
str     = pthread_self();
rts   = pthread_self();
pthread_create(&str,NULL,sender_to_receiver,NULL);
pthread_create(&rts,NULL,receiver_to_sender,NULL);
pthread_join(str,NULL);
pthread_join(rts,NULL);
}else{
printf("Error Connecting to receivern");
}
}
}
void* sender_to_receiver(){
int t1;packet* p = malloc(sizeof(packet));
while((t1=receive_packet(sender_fd,&p))!=0){
printf("SENDERn");
display_packet(p);
send_packet(receiver_fd,p);
}
printf("Sender Disconnectedn");
close(sender_fd);close(receiver_fd);
}
void* receiver_to_sender(){
int t1;packet* p = malloc(sizeof(packet));
while((t1=receive_packet(receiver_fd,&p))!=0){
printf("Receivern");
display_packet(p);
send_packet(sender_fd,p);
}
printf("Receiver Disconnectedn");
close(receiver_fd);close(sender_fd);
}
int open_outgoing_connection(char* ip, char* port){
int gai;
char server_ip[100];memset(server_ip,'',sizeof(server_ip));
struct addrinfo hints,*server;
memset(&hints,0,sizeof hints);
hints.ai_family     = AF_UNSPEC;
hints.ai_socktype   = SOCK_STREAM;
int socket_fd;
if((gai=getaddrinfo(ip,port,&hints,&server)) != 0){
printf("GetAddrInfo Error: %sn",gai_strerror(gai));
return -1;
}
if((socket_fd = socket(server->ai_family, server->ai_socktype, server->ai_protocol)) == -1){
printf("Socket Errorn");
return -1;
}
if(connect(socket_fd,server->ai_addr,server->ai_addrlen) == -1){
printf("Connect Errorn");
return -1;
}
freeaddrinfo(server);
inet_ntop(server->ai_family, get_in_addr((struct sockaddr*)server->ai_addr), server_ip, sizeof(server_ip));
printf("Connected to: %sn",server_ip);
return socket_fd;
}

接收方代码:-

#include "packets.c"
#define QUEUE_LENGTH 10
void handle_connection(int);
int main(int argc, char* argv[]){
if(argc!=2){
printf("Enter PORTn");
return 1;
}
int sock_fd,new_fd,rv,yes;yes=1;
struct addrinfo hints,*res;
struct sockaddr_storage client_addr;
socklen_t addr_size;
char client_details[INET6_ADDRSTRLEN];

struct sigaction sa;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
if((rv=getaddrinfo(NULL, argv[1], &hints, &res))!=0){
printf("Error getaddrinfo : %sn",gai_strerror(rv));
return 1;
}

if((sock_fd = socket(res->ai_family, res->ai_socktype, res->ai_protocol))==-1){
printf("Error socket file descriptorn");
return 1;
}
if(setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1){
printf("Error setsockoptn");
return 1;
}
if(bind(sock_fd,res->ai_addr,res->ai_addrlen)==-1){
close(sock_fd);
printf("Error bindn");
return 1;
}
if(listen(sock_fd, QUEUE_LENGTH)==-1){
printf("Error listenn");
return 1;
}
freeaddrinfo(res);
sa.sa_handler = sigchld_handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
if(sigaction(SIGCHLD,&sa,NULL) == -1){
printf("Error sigactionn");
return 1;
}
// Now we have prepared the socket(ip+port) for accepting incoming connections.
printf("Server PID : %dn",getpid());
while(1){
addr_size = sizeof client_addr;
new_fd = accept(sock_fd,(struct sockaddr*)&client_addr, &addr_size);
if(new_fd==-1){
printf("Error Accepting Request %dn",getpid());
return 1;
}
inet_ntop(client_addr.ss_family,get_in_addr((struct sockaddr*)&client_addr),client_details,sizeof client_details);
if(!fork()){ // Child Process
close(sock_fd);
printf("Connection Accepted From : %s by PID:%dn",client_details,getpid());
handle_connection(new_fd);
exit(0);
}
close(new_fd);
}
return 1;
}

void handle_connection(int socket_fd){
int t1;packet* p = malloc(sizeof(packet));
while((t1=receive_packet(socket_fd,&p))!=0){
display_packet(p);
p->message = "ACK";
p->timestamp = get_time_in_ns();
send_packet(socket_fd,p);
}
printf("Sender Disconnectedn");
}

发件人代码:-

#include "packets.c"
#define CHANNEL_PORT "8642"
#define CHANNEL_IP   "127.0.0.1"
int socket_fd;
char* destination_ip;
char* destination_port;
int number_of_packets;
char message[MESSAGE_BUFFER_LEN];

void prepare_packet_header(packet *p){
p->destination_ip=destination_ip;
p->destination_port=destination_port;
p->timestamp = get_time_in_ns();
p->length=0;
}

void divide_message_and_send_packets(){
if(number_of_packets>strlen(message)){
number_of_packets = strlen(message);
}
int indi_len = strlen(message)/number_of_packets;
int lm = 0;int i,j;
packet* all_packets[number_of_packets];
for(i=0;i<number_of_packets;i+=1)all_packets[i]=malloc(sizeof(packet));
// HANDSHAKE
prepare_packet_header(all_packets[0]);
all_packets[0]->message="SYN";
all_packets[0]->uid=-1;
send_packet(socket_fd,all_packets[0]);
// HANDSHAKE OVER
for(i=0;i<number_of_packets;i+=1){
// printf("Processing Packet: %d with message[%d:%d]n",i,lm,lm+indi_len);
if(i!=number_of_packets-1){
char temp[indi_len+1];memset(temp,'', sizeof temp);
for(j=lm;j<lm+indi_len;j+=1)temp[j-lm]=message[j];
all_packets[i]->message = malloc(sizeof temp);
strcpy(all_packets[i]->message, temp);
all_packets[i]->uid = i;
}else{
char temp[strlen(message)-lm+1];memset(temp,'', sizeof temp);
for(j=lm;j<strlen(message);j+=1)temp[j-lm]=message[j];
all_packets[i]->message = malloc(sizeof temp);
strcpy(all_packets[i]->message, temp);
all_packets[i]->uid = i;
}
prepare_packet_header(all_packets[i]);lm+=indi_len;
display_packet(all_packets[i]);
send_packet(socket_fd,all_packets[i]);
}
}
void main(int argc,char* argv[]){
if(argc!=5){
printf("Enter DESTINATION_IP DESTINATION_PORT NUMBER_OF_PACKETS MESSAGEn");
return;
}
strcpy(message,argv[4]);
number_of_packets=atoi(argv[3]);
destination_ip=malloc(sizeof argv[1] + 1);memset(destination_ip,'', sizeof destination_ip);
destination_port=malloc(sizeof argv[2] + 1);memset(destination_port,'', sizeof destination_port);
strcpy(destination_ip,argv[1]);
strcpy(destination_port,argv[2]);
int gai;
char server_ip[100];memset(server_ip,'',sizeof(server_ip));
struct addrinfo hints,*server;
memset(&hints,0,sizeof hints);
hints.ai_family     = AF_UNSPEC;
hints.ai_socktype   = SOCK_STREAM;
if((gai=getaddrinfo(CHANNEL_IP,CHANNEL_PORT,&hints,&server)) != 0){
printf("GetAddrInfo Error: %sn",gai_strerror(gai));
return;
}
if((socket_fd = socket(server->ai_family, server->ai_socktype, server->ai_protocol)) == -1){
printf("Socket Errorn");
}
if(connect(socket_fd,server->ai_addr,server->ai_addrlen) == -1){
printf("Connect Errorn");
}
freeaddrinfo(server);
inet_ntop(server->ai_family, get_in_addr((struct sockaddr*)server->ai_addr), server_ip, sizeof(server_ip));
printf("Connected to: %sn",server_ip);
divide_message_and_send_packets();
while(1){} // busy wait taht simulates future work
}

数据包代码.c(包含与数据包相关的功能(:-

#include "helper.c"
typedef struct StupidAssignment{
long length;
char* destination_ip;
char* destination_port;
long timestamp;
long uid;
char* message;
}packet;
int receive_packet(int socket,packet** p1){
packet* p = *p1;
int remaining=0;int i;
int received=0;
long content_length=0;
remaining=11;
char buffer[MESSAGE_BUFFER_LEN];memset(buffer,'',sizeof(buffer));
while(remaining>0){
int t1 = recv(socket, buffer+received, remaining, 0);
if(t1==0)return 0;
remaining-=t1;
received+=t1;
}
content_length = read_long(buffer, received);
received=0;
remaining=content_length;p->length=content_length;
memset(buffer,'',sizeof(buffer));
while(remaining>0){
int t1 = recv(socket, buffer+received, remaining, 0);
if(t1==0)return 0;
remaining-=t1;
received+=t1;
}
char part[MESSAGE_BUFFER_LEN];memset(part,'',sizeof(part));int part_len=0;int nlmkr=0;
for(i=0;i<=content_length;i+=1){
if(buffer[i]=='n' || i==content_length){
nlmkr+=1;
if(nlmkr==1)    read_char(&(p->destination_ip), part, part_len);
else if(nlmkr==2)   read_char(&(p->destination_port), part, part_len);
else if(nlmkr==3)   p->timestamp = read_long(part, part_len);
else if(nlmkr==4)   p->uid = read_long(part, part_len);
else if(nlmkr==6)   read_char(&(p->message), part, part_len);
part_len=0;memset(part, '', sizeof part);
}else{
part[part_len++]=buffer[i];
}
}
return 1;
}
void send_packet(int socket,packet *p){
char temp[MESSAGE_BUFFER_LEN];memset(temp,'',sizeof temp);
strcat(temp,p->destination_ip);strcat(temp,"n");
strcat(temp,p->destination_port);strcat(temp,"n");
snprintf(temp+strlen(temp),100,"%ldn",p->timestamp);
snprintf(temp+strlen(temp),100,"%ldn",p->uid);
// write_long(p->timestamp,temp);strcat(temp,"n");
// write_long(p->uid,temp);strcat(temp,"n");
strcat(temp,"n");
strcat(temp,p->message);
char buffer[MESSAGE_BUFFER_LEN];memset(buffer, '', sizeof buffer);
p->length = strlen(temp);
snprintf(buffer,100,"%10ldn",strlen(temp));
strcat(buffer, temp);
sendAll(buffer,socket);
}
void display_packet(packet* p){
printf("----PACKET START----n");
printf("%ldn",p->length);
printf("%sn",p->destination_ip);
printf("%sn",p->destination_port);
printf("%ldn",p->timestamp);
printf("%ldn",p->uid);
printf("%sn",p->message);
printf("----PACKET END-----n");
}

helper.c的代码(所有其他代码使用的一些函数(:-

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <poll.h>
#include <arpa/inet.h>
#include <sys/wait.h>
#include <signal.h>
#include <time.h>
#include <sys/stat.h>
#include <ctype.h>
#include <fcntl.h>
#include <pthread.h>
#define MESSAGE_BUFFER_LEN 20480
void read_char(char** into, char* from, int length){
*into = malloc(length+1);memset(*into, '', sizeof *into);
strcpy(*into, from);
// printf("%sn",into);
}
long read_long(char* from, int length){
int i;long temp=0;
for(i=0;i<length;i+=1){
if(isdigit(*(from+i))){
temp=temp*10;temp+=(long)(*(from+i) - '0');         
}
}
return temp;
}
void write_long(long t1,char* m){
int mkr=0;
char temp[100];memset(temp,'',sizeof temp);
while(t1!=0){
temp[mkr] = ((int)(t1%10)) + '0';
t1 = t1/10;
}
for(mkr=strlen(temp)-1;mkr>=0;mkr-=1){
*(m+strlen(m))=temp[mkr];
}
}
void *get_in_addr(struct sockaddr* sa){
if(sa->sa_family == AF_INET){
return &(((struct sockaddr_in *)sa)->sin_addr);
}else{
return &(((struct sockaddr_in6*)sa)->sin6_addr);
}
}

void sigchld_handler(int s){
int saved_errno=errno;
while(waitpid(-1, NULL, WNOHANG) > 0);
errno = saved_errno;
}
int sendAll(char* data_to_send,int socket_fd){
int bytesleft = strlen(data_to_send);
int total=0;int n;
while(bytesleft>0){
n = send(socket_fd,data_to_send + total, bytesleft, 0);
if(n==-1)break;
total+=n;
bytesleft-=n;
}
return n==-1?-1:0;
}
long get_time_in_ns(){
struct timespec start;clock_gettime(CLOCK_REALTIME,&start);
long ct = ((long)start.tv_sec)*1e9 + ((long)start.tv_nsec);
return ct;
}

该代码根本没有文档记录。

对于TCP,在套接字上调用shutdown。更完整:

  1. 设置一些线程将检查的标志,以便在解除阻止时知道关闭正在进行中
  2. 执行任何需要关闭连接的操作,完成后最终在套接字上调用shutdown。如果你需要调用shutdown作为拆卸过程的一部分,那就去做。如果没有,当你完成拆卸过程(如果有的话(shutdown时,双向连接
  3. 在任何情况下,都不要在套接字上调用close,直到您能够100%确认没有线程正在或可能正在尝试访问套接字。这一点极为重要

对于UDP,向套接字发送数据报。这将在线程接收到伪数据报时解除阻塞。

最新更新