c-select()未检测到传入数据



目标:N个节点(运行在不同的机器上)应该通过建立TCP连接来相互通信。发送和接收消息由进程创建的2个线程完成。最初,主进程将所有节点相互连接,创建2个线程,并为其提供一个文件描述符列表,线程可以使用该列表来发送和接收数据。下面的结构由主进程填充并传递给线程。

typedef struct
{
char hostName[MAXIMUM_CHARACTERS_IN_HOSTNAME];  /* Host name of the node */
char portNumber[MAXIMUM_PORT_LENGTH];           /* Port number of the node */
char nodeId[MAXIMUM_NODE_ID_LENGTH];            /* Node ID of the node */
int socketFd;                                   /* Socket file descriptor */
int socketReady;        /* Flag to indicate if socket information is filled */
}SNodeInformation;

PS:socketFd是accept()或socket()接收的套接字描述符,具体取决于连接的建立方式(侦听来自节点的连接或连接到节点)。

使用大小为MAX_NUM_of_NODES的SNodeInformation数组。

send线程遍历nodeInformation并向所有节点发送一条消息"Hello",如下所示。

void *sendMessageThread(void *pNodeInformation) {
int i;
int ownNodeId;
int bytesSent = 0;
char ownHostName[MAXIMUM_CHARACTERS_IN_HOSTNAME];
SNodeInformation *nodeInformation = (SNodeInformation *) pNodeInformation;
SNodeInformation *iterNodeInformation;
printf("SendMessageThread: Send thread createdn");
if(gethostname(ownHostName, MAXIMUM_CHARACTERS_IN_HOSTNAME) != 0) {
perror("Error: sendMessageThread, gethostname failedn");
exit(1);
}
for(i=0, iterNodeInformation=nodeInformation ; i<MAXIMUM_NUMBER_OF_NODES ; i++, iterNodeInformation++) {
if(strcmp((const char*) iterNodeInformation->hostName, (const char*) ownHostName) != 0)  {
/* Send message to all nodes except yourself */
bytesSent = send(iterNodeInformation->socketFd, "Hello", 6, 0);
if(bytesSent == -1) {
printf("Error: sendMessageThread, sending failed, code: %s FD %dn",     strerror(errno), iterNodeInformation->socketFd);
}
}
}
pthread_exit(NULL);
}

接收线程遍历nodeInformation,设置一个文件描述符集,并使用select等待传入数据,如下所示。

void *receiveMessageThread(void *pNodeInformation)
{
int i;
int fileDescriptorMax = -1;
int doneReceiving = 0;
int numberOfBytesReceived = 0;
int receiveCount = 0;
fd_set readFileDescriptorList;
char inMessage[6];
SNodeInformation *nodeInformation = (SNodeInformation *) pNodeInformation;
SNodeInformation *iterNodeInformation;
printf("ReceiveMessageThread: Receive thread createdn");
/* Initialize the read file descriptor */
FD_ZERO(&readFileDescriptorList);
for(i=0, iterNodeInformation=nodeInformation ; i<MAXIMUM_NUMBER_OF_NODES ; i++, iterNodeInformation++) {
FD_SET(iterNodeInformation->socketFd, &readFileDescriptorList);
if(iterNodeInformation->socketFd > fileDescriptorMax) {
fileDescriptorMax = iterNodeInformation->socketFd;
}
}
printf("ReceiveMessageThread: fileDescriptorMax:%dn", fileDescriptorMax);
while(!doneReceiving) {
if (select(fileDescriptorMax+1, &readFileDescriptorList, NULL, NULL, NULL) == -1) {
perror("Error receiveMessageThread, select failed n");
return -1;
}
for(i=0 ; i<fileDescriptorMax ; i++) {
if (FD_ISSET(i, &readFileDescriptorList)) {
/* Check if any FD was set */
printf("ReceiveThread: FD set %dn", i);
/* Receive data from one of the nodes */
if ((numberOfBytesReceived = recv(i, &inMessage, 6, 0)) <= 0) {
/* Got error or connection closed by client */
if (numberOfBytesReceived == 0) {
/* Connection closed */
printf("Info: receiveMessageThread, node %d hung upn", i);
}
else {
perror("Error: receiveMessageThread, recv FAILEDn");
}
close(i);
/* Remove from Master file descriptor set */
FD_CLR(i, &readFileDescriptorList); 
doneReceiving = 1;
}
else {
/* Valid data from a node */
inMessage[6] = '';
if(++receiveCount == MAXIMUM_NUMBER_OF_NODES-1) {
doneReceiving = 1;
}
printf("ReceiveThread: %s received, count: %dn", inMessage, rece    iveCount);
}
}
}
}
pthread_exit(NULL);
}

预期输出:我只尝试了两个进程,P1(首先启动)和P2在机器1上运行,另一个在机器2上运行。机器中的两个进程都应该首先连接,然后线程应该发送和接收消息"Hello"并退出。

观察到的输出:P1能够发送消息,P2(接收器线程)能够接收消息"Hello"。但是P1(接收器线程)无法从P2(发送线程)获得消息。两台机器中的应用程序代码是相同的,但每次首先启动的进程都不会从另一个进程获得消息。我添加了一个打印,只是为了检查是否设置了一些文件描述符,但我看不到P1的描述符,只看到P2的描述符。接收过程中的发送没有失败,它返回6。我检查了文件描述符的最大值,它是正确的。

如果我首先启动P2,然后启动P1,那么我可以看到P1接收来自P2的消息并且存在,而P2无限地等待来自P1的消息。

我不确定问题是因为套接字描述符的使用不正确还是因为线程?

两个问题:

1正在设置的文件描述符的循环测试不包括放入该集合的所有文件描述符。(该编程错误预计是操作程序中所述故障的原因。)

2传递给select()的文件描述符集由select()修改,因此该集需要在select()再次初始化之前重新初始化。(只有当接收到来自多个套接字的数据时,编程错误才会显著。)

请参阅OP代码的以下mod/s:

void *receiveMessageThread(void *pNodeInformation)
{
...
printf("ReceiveMessageThread: Receive thread createdn");
while(!doneReceiving) {
/* Initialize the read-set of file descriptors */
/* Issue 2 fixed from here ... */
FD_ZERO(&readFileDescriptorList);
for(i=0, iterNodeInformation=nodeInformation ; i<MAXIMUM_NUMBER_OF_NODES ; i++, iterNodeInformation++) {
FD_SET(iterNodeInformation->socketFd, &readFileDescriptorList);
if (iterNodeInformation->socketFd > fileDescriptorMax) {
fileDescriptorMax = iterNodeInformation->socketFd;
}
}
/* ... up to here. */
printf("ReceiveMessageThread: fileDescriptorMax:%dn", fileDescriptorMax);
if (select(fileDescriptorMax+1, &readFileDescriptorList, NULL, NULL, NULL) == -1) {
perror("Error receiveMessageThread, select failed n");
return -1;
}
for(i=0 ; i <= fileDescriptorMax ; i++) { /* Issue 1 fixed here. */
...

最新更新