C#应该使用多少次.BeginReceive()被调用以接收通过TCP发送的3个块是否3次调用Linux C write



Do 3个调用通过TCP向Linux'C'write()发送的3个块,被Windows C#接收为相同的3个组块。BeginReceive(),或者单个连续块,或者无论何时接收到多少块。是否调用了BeginReceived?

Linux上的"C"应用程序通过TCP连接,通过3次对write()的调用向Windows C#应用程序发送消息,该应用程序使用BeginReceive()进行接收。

BeginReceive()是否需要调用三次才能接收write()发送的三个块中的每一个?或者BeginReceive()接收的大小等于调用BeginReceive()时Windows接收的大小?它可以是3个writes()发送的所有字节,也可以是一部分,所以。是否应调用BeginReceive()直到接收到所有消息?

Linux C应用程序运行在嵌入式TI ARM上,在同一个盒子里,Windows C#应用程序运行的是单板计算机。ARM通过以太网直接连接到SBC。

ARM和SBC之间的通信有时无法在启动时启动,我正在对源代码进行逆向工程,以检查是否存在糟糕的设计。

ARM端是TCP侦听器,Windows客户端启动TCP连接。

使用MontaVista(R)Linux(R)专业版5.0.0(0702774)以及Windows-7 Visual Studio 2010 Visual C#。

这是ARM发送软件和Windows接收软件。。。。。。。。。。。。。。。。。。。。。。。。

LINX'C'

char msgBuffer[64];
sprintf(msgBuffer, START_MSG_ENVELOPE, msgId++, ack);
write(connection->fd, msgBuffer, strlen(msgBuffer));
write(connection->fd, msg, strlen(msg));
write(connection->fd, END_MSG_ENVELOPE, strlen(END_MSG_ENVELOPE));

这是它的WINDOWS C端

private static void makeConnection(Socket clientSocket, int iPortNo)
{
TimeoutObject.Reset();
socketexception = null;
IPAddress ip;
//create the end point 
IPEndPoint ipEndPoint;
ip = IPAddress.Parse(ipAddress);
try
{
ipEndPoint = new IPEndPoint(ip, iPortNo);
//connect to the remote host...
clientSocket.BeginConnect(ip, iPortNo, new AsyncCallback(CallBackMethod), clientSocket);
if (TimeoutObject.WaitOne(5 * 1000, false))    //5 secs connection timeout
{
if (!IsConnectionSuccessful)
{
string msg = VNResourceManager.Instance.GetString(VNMessages.DAM_NOT_FOUND);
if (socketexception != null)
msg += ": " + socketexception.Message;
throw new Exception(msg);
}
}
else
{
clientSocket.Close();
throw new TimeoutException(VNResourceManager.Instance.GetString(VNMessages.CONNECTION_TO_DAM_TIMED_OUT));
}
//watch for data ( asynchronously )...
WaitForData();
}
catch (SocketException e)
{
socketexception = e;
throw;
}
}
private static void CallBackMethod(IAsyncResult asyncresult)
{
try
{
IsConnectionSuccessful = false;
Socket socket = asyncresult.AsyncState as Socket;
if (socket.Connected)
{
socket.EndConnect(asyncresult);
IsConnectionSuccessful = true;
}
}
catch (Exception ex)
{
IsConnectionSuccessful = false;
socketexception = ex;
}
finally
{
TimeoutObject.Set();
}
}
public static void WaitForData()
{
try
{
if (asyncCallBack == null)
{
asyncCallBack = new AsyncCallback(OnDataReceived);
}
CSocketPacket theSocPkt = new CSocketPacket();
theSocPkt.thisSocket = clientSocket;
asyncResult = clientSocket.BeginReceive(theSocPkt.dataBuffer, 0, theSocPkt.dataBuffer.Length, SocketFlags.None, asyncCallBack, theSocPkt);
}
catch (SocketException se)
{
notifyErrorEventSubscribers(se);
}
}
public static void send(string message)
{
try
{
byte[] byData = System.Text.Encoding.ASCII.GetBytes(message);
clientSocket.Send(byData);
}
catch (SocketException se)
{
notifyErrorEventSubscribers(se);
throw;
}
}
//[MethodImpl(MethodImplOptions.Synchronized)]
public static void OnDataReceived(IAsyncResult result)
{
try
{
CSocketPacket theSockId = (CSocketPacket)result.AsyncState;
//end receive...
int messageSize = 0;
messageSize = theSockId.thisSocket.EndReceive(result);
Console.WriteLine(">>>>>>>>>  messageSize = " + messageSize); // !!!
char[] chars = new char[messageSize + 1];
System.Text.Decoder d = System.Text.Encoding.ASCII.GetDecoder();
int charLen = d.GetChars(theSockId.dataBuffer, 0, messageSize, chars, 0);
string replyMessage = new System.String(chars);
lock (syncLock)  //LastIndexOf function accesses the current culture info and we clear it in WM_TIMECHANGE handler (protecting from that race condition here)
{
if (replyMessage.LastIndexOf("") > 0)
replyMessage = replyMessage.Remove(replyMessage.LastIndexOf(""), 1);
if (replyMessage.LastIndexOf(Terminator) > 0)
replyMessage = replyMessage.Remove(replyMessage.LastIndexOf(Terminator), 1);
}

// Continue the waiting for data on the Socket
WaitForData();
receivedMsg += replyMessage;

// only serialize when we feel we have a message or we have reached the message line limit  
if (((receivedMsg.Contains("message") && receivedMsg.Contains("/>")) || receivedMsg.Contains("</message>")) /* || (mRecvdMsgLineCount == Message.kMaxLines) */ )
{
List<XmlMessage> msgList = new List<XmlMessage>();
int index = -1;
do
{
index = receivedMsg.IndexOf("</message>");
if (index != -1)
{
XmlMessage message;
string strMessage = receivedMsg.Substring(0, index + "</message>".Length);
//MessageBox.Show(strMessage);
strMessage = strMessage.TrimStart(new char[] { 'r', 'n' });
receivedMsg = receivedMsg.Remove(0, index + "</message>".Length);
try
{
message = (XmlMessage)XmlMessage.GetXmlSerializer().Deserialize(XmlReader.Create(new StringReader(strMessage)));
}
catch (InvalidOperationException error)
{
string strErrorMessage = error.Message;
if (error.InnerException != null)
strErrorMessage += "rn" + error.InnerException.Message;
notifyErrorEventSubscribers(new Exception(strErrorMessage + "rn-----------------------------------------------------------------rn" + strMessage));
return;
}
msgList.Add(message);
}
} while (index != -1);
StringWriter sw = new StringWriter();
string serializedXml = string.Empty;
string strXmlMessage = string.Empty;
foreach (XmlMessage message in msgList)
{
if (message.ack_required && (message.update == null))
{
XmlMessage messageAcknowledge = new XmlMessage();
messageAcknowledge.ack_required = false;
messageAcknowledge.ack = new ack();
messageAcknowledge.ack.success = true;
messageAcknowledge.ack.id = message.id;
try
{
sendMessage(messageAcknowledge);
}
catch(Exception ex)
{
Logger.Log(EnumLoggingLevel.Error, "SocketCommunicationXMl.OnDataReceived", ex.Message);
}
}
if (dataReceivedEvent != null)
{
dataReceivedEvent(new object(), new DataReceivedEventArgs(message));
}
if ((ackRequiredMsg != null) && (message.ack != null))
{
if ((message.ack.id == ackRequiredMsg.id) && message.ack.success)
{
eventWaitForAck.Set();
}
}
}
}
}
catch (ObjectDisposedException objectDisposedException)
{
//              Dispatcher.dispatchDebug(Debug.Level_3,"Socket has been closed", this);
notifyErrorEventSubscribers(objectDisposedException);
}
catch (SocketException se)
{
if (se.ErrorCode == 10054)
{
/*
for (int i = 0; i < 50; i++)
{
Thread.Sleep(1000);
}
try
{
SocketCommunicationDaq.Reconnect();
}
catch(Exception ex)
{
VNMsgBox.Show(ex.Message, MsgButtonType.OkOnly, MsgIconType.Error);
return;
}
clientSocket.Shutdown(SocketShutdown.Both);
clientSocket.Close();
for (int i = 0; i < 3; i++)
{
try
{
connect();
break;
}
catch (Exception ex)
{
System.Threading.Thread.Sleep(5000);
}
}
*/
Logger.Log(EnumLoggingLevel.Error, "OnDataReceived: ", se.ToString());
}
else
{
notifyErrorEventSubscribers(se);
}
}
}

正如其他人所提到的,TCP是一种流协议,因此您永远无法判断需要多少DataReceived回调才能获得所有100个字节。可能是1,也可能是100。

接收代码相当复杂,性能可以提高(字符串操作太多)。很难说是否存在控制流问题。我建议将DataReceived方法分解以简化。以下是一个合理的框架:

public static void OnDataReceived(IAsyncResult result)
{
//1) copy all data received into a buffer of some sort, like MemoryStream
//2) Dispatch any/all full messages that have been received
// to a queue or handler method (maybe handle on another thread)
//(hold onto any leftover bytes received that are part of the next message)
//Call BeginReceive() again
}

此外,如果使用"长度前缀"消息格式,它可以帮助简化"框架"。

正如@nos已经指出的,无论写入的客户端应用程序是什么,接收的数量都不等于发送的数量。

另请参阅TcpClient';s NetworkStream完成一次读取操作?

最新更新