C++套接字未完成发送较长消息的数据



我有一个函数,我试图发送205453字节的消息。下面的函数发送输出直到143080字节,然后send(..)调用连续3次返回-1 size_sent bytes,此时超时。

我有一个python测试客户端,它在while循环中接收消息。它进行两次调用并接收143080字节,但它没有完成接收消息。

我认为这可能是一个超时问题,所以我禁用了c++套接字超时,但这没有帮助。

setsockopt(sock_fd, SOL_SOCKET,SO_SNDTIMEO,(const char*)&time_val_struct,sizeof(time_val_struct));`

c++函数如下:

size_t network::send_full_msg(int sock_fd, char *write_buf, size_t write_buf_length, std::chrono::microseconds timeout)
{
debug_log log("network_interface", "send_full_msg");
try{

log.msg("Preparing to send an entire message through. msg size is...." + to_string(write_buf_length));
const size_t chunk_size = 16000;        //will read 16000 bytes at a time
fcntl(sock_fd, F_SETFL, O_NONBLOCK); //makes the socket nonblocking
log.msg("Set socket non blocking..." + to_string(write_buf_length));
struct timeval time_val_struct;
time_val_struct.tv_sec = 0;
time_val_struct.tv_usec = 0;
setsockopt(sock_fd, SOL_SOCKET,SO_SNDTIMEO,(const char*)&time_val_struct,sizeof(time_val_struct));
log.msg("Turned off socket timeout..." + to_string(write_buf_length));

size_t pos_in_buf = 0; //starts at 0 and is incremented to write to the right location
ssize_t size_sent = 0; //the size of the values obtained from a recv
int num_neg_count=0;
auto start_time = chrono::high_resolution_clock::now();
while (pos_in_buf < write_buf_length)
{
auto current_time = chrono::high_resolution_clock::now();
auto duration = chrono::duration_cast<chrono::microseconds>(current_time - start_time);
//cout << "Duration: " << duration.count() << endl;
//cout << "Timeout: " << timeout.count() << endl;
if (duration > timeout || num_neg_count>3) //timeout or 3 consecutive failed writes
{
log.msg("Timeout exceeded");
break;
}
//remaining buf size is the total buf length minus the position (plus 1?)
size_t remaining_buf_size = write_buf_length - pos_in_buf;                                     //avoids a segmentation fault
size_t bytes_to_write = remaining_buf_size > chunk_size ? chunk_size : remaining_buf_size; //works to prevent a segmentation fault
size_sent = send(sock_fd, write_buf+pos_in_buf, bytes_to_write, 0);
log.msg("Sent bytes..." + to_string(size_sent));
log.msg("Pos in buf..." + to_string(pos_in_buf));
log.msg("Bytes to write..." + to_string(bytes_to_write));

// log.msg("size_recv: " + to_string(size_recv));
// log.msg("bytes to read: " + to_string(bytes_to_read));
if (size_sent < 0)
{
num_neg_count++; //if there are 3 consecutive failed writes we will quit
this_thread::sleep_for(chrono::microseconds(100)); //needs to wait to try and get more data
continue;
}else{
num_neg_count = 0; //reset the failed writes
pos_in_buf += size_sent;
}
//log.msg("Data received! Length: " + to_string(size_recv));

}
log.msg("Total data length sent was: " + to_string(pos_in_buf));
if(pos_in_buf == 0)
return -1; //error, no data received
return pos_in_buf; //the full size of the message received
}catch(exception &e){
cout << " Exception in network socket " << e.what() << endl;
return -1;
}
}

c++输出如下:

[ network_interface/send_full_msg ] Preparing to send an entire message through. msg size is....205453
[ network_interface/send_full_msg ] Set socket non blocking...205453
[ network_interface/send_full_msg ] Turned off socket timeout
[ network_interface/send_full_msg ] Sent bytes...16000
[ network_interface/send_full_msg ] Pos in buf...0
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Sent bytes...16000
[ network_interface/send_full_msg ] Pos in buf...16000
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Sent bytes...16000
[ network_interface/send_full_msg ] Pos in buf...32000
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Sent bytes...16000
[ network_interface/send_full_msg ] Pos in buf...48000
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Sent bytes...16000
[ network_interface/send_full_msg ] Pos in buf...64000
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Sent bytes...16000
[ network_interface/send_full_msg ] Pos in buf...80000
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Sent bytes...16000
[ network_interface/send_full_msg ] Pos in buf...96000
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Sent bytes...16000
[ network_interface/send_full_msg ] Pos in buf...112000
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Sent bytes...15080
[ network_interface/send_full_msg ] Pos in buf...128000
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Sent bytes...-1
[ network_interface/send_full_msg ] Pos in buf...143080
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Sent bytes...-1
[ network_interface/send_full_msg ] Pos in buf...143080
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Sent bytes...-1
[ network_interface/send_full_msg ] Pos in buf...143080
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Sent bytes...-1
[ network_interface/send_full_msg ] Pos in buf...143080
[ network_interface/send_full_msg ] Bytes to write...16000
[ network_interface/send_full_msg ] Timeout exceeded
[ network_interface/send_full_msg ] Total data length sent was: 143080

我已经尝试更改套接字超时,我也尝试更改正在发送的字节大小从8000字节和16000字节。

输出显示它发送第一个16000字节x 8,然后第九次它只发送15080字节。这很奇怪,因为它应该再发送一个16000字节。

我检查了输入write_buf是否有问题,但它是从一个字符串变量创建的,如下所示:

send_full_msg(client_fd, const_char<char*>(response.c_str()), response.length(), chrono::microseconds((int)5e6);

同样,如果我重新发送消息,即使大小更改为207801,它总是只发送143080字节。

我使用select(..)解决了这个问题

size_t network::send_full_msg(int sock_fd, char *write_buf, size_t write_buf_length, std::chrono::microseconds timeout) {
debug_log log("network_interface", "send_full_msg");
try{
fd_set set;
struct timeval socktimeout;
int rv;
FD_ZERO(&set); /* clear the set */
FD_SET(sock_fd, &set); /* add our file descriptor to the set */
socktimeout.tv_sec = std_thread_timeout_microseconds/1000000;
socktimeout.tv_usec = 0;

log.msg("Preparing to send an entire message through. msg size is...." + to_string(write_buf_length));
const size_t chunk_size = 16000;        //will read 16000 bytes at a time
//fcntl(sock_fd, F_SETFL, O_NONBLOCK); //makes the socket nonblocking
//log.msg("Set socket non blocking..." + to_string(write_buf_length));
//struct timeval time_val_struct;
//time_val_struct.tv_sec = 0;
//time_val_struct.tv_usec = 0;
//setsockopt(sock_fd, SOL_SOCKET,SO_SNDTIMEO,(const char*)&time_val_struct,sizeof(time_val_struct));
//log.msg("Turned off socket timeout");

size_t pos_in_buf = 0; //starts at 0 and is incremented to write to the right location
ssize_t size_sent = 0; //the size of the values obtained from a recv
int num_neg_count=0;
auto start_time = chrono::high_resolution_clock::now();
log.msg("Entering loop non block on write...");

while (pos_in_buf < write_buf_length)
{
auto current_time = chrono::high_resolution_clock::now();
auto duration = chrono::duration_cast<chrono::microseconds>(current_time - start_time);

rv = select(sock_fd + 1, NULL, &set, NULL, &socktimeout);
if(rv==0){
log.msg("Select timeout...num neg count is: " + to_string(num_neg_count));
//timeout
num_neg_count++;
if(num_neg_count > 3){ //three timeouts in a row
return pos_in_buf == 0 ? -1 : pos_in_buf;
}else{
continue;
}
}
else if(rv==-1){
//do nothing if this hits the timeout it will break out
log.msg("Select error...num neg count is: " + to_string(num_neg_count));
}
else{
//there is data to be handled
log.msg("Select is saying socket is available for sending...");
//remaining buf size is the total buf length minus the position (plus 1?)
size_t remaining_buf_size = write_buf_length - pos_in_buf;                                     //avoids a segmentation fault
size_t bytes_to_write = remaining_buf_size > chunk_size ? chunk_size : remaining_buf_size; //works to prevent a segmentation fault
size_sent = send(sock_fd, write_buf+pos_in_buf, bytes_to_write, 0);
log.msg("Sent bytes..." + to_string(size_sent));
log.msg("Pos in buf..." + to_string(pos_in_buf));
log.msg("Bytes to write..." + to_string(bytes_to_write));
log.msg("Remaining buf size..." + to_string(remaining_buf_size));

// log.msg("size_recv: " + to_string(size_recv));
// log.msg("bytes to read: " + to_string(bytes_to_read));
if (size_sent < 0)
{
perror("Socket send returned -1");
num_neg_count++; //if there are 3 consecutive failed writes we will quit
//this_thread::sleep_for(chrono::microseconds(100)); //needs to wait to try and get more data
continue;
}else{
num_neg_count = 0; //reset the failed writes
pos_in_buf += size_sent;
}
//log.msg("Data received! Length: " + to_string(size_recv));
}
//cout << "Duration: " << duration.count() << endl;
//cout << "Timeout: " << timeout.count() << endl;
if (duration > timeout || num_neg_count>3) //timeout or 3 consecutive failed writes
{
log.msg("Timeout exceeded");
break;
}


}
log.msg("Total data length sent was: " + to_string(pos_in_buf));
if(pos_in_buf == 0)
return -1; //error, no data received
return pos_in_buf; //the full size of the message received
}catch(exception &e){
cout << " Exception in network socket " << e.what() << endl;
return -1;
} }

最新更新