我进行了模拟测试,以了解我在UDP通信中面临的问题。
设置
我有一个运行 4 个 UDP 客户端的主机,每个客户端都在自己的线程(T1、T2、T3 和 T4)中。T1 和 T2 共享一个名为 socket 的数据报套接字对象,而 T3 和 T4 共享一个名为 socket2 的数据报套接字对象。
T1 和T2 正在发送和接收来自 IP (例如) udpServer1 的 UDP 服务器的回声,而 T3 和 T4 正在与 udpServer2 通信。
问题
在并行运行线程时,我能够同步共享相同 DatagramSocket 和 Runnable 对象的 T1 和 T2。但是,当尝试运行使用不同 DatagramSocket 和 Runnable 对象的 T1 和 T3 时,T3 总是因 SocketTimeoutException 而失败,但 T1 可以毫无问题地发送和接收数据包。
总结:
并行运行 T1 和 T2(或 T3 和 T4),共享相同的 DatagramSocket 和 Runnable 对象 --> OK
并行运行 T1 和 T3,每个都使用自己的 DatagramSocket 和 Runnable 对象 -->其中一个对象 SocketTimeoutException。
问题
为什么 T3 不断获得 SocketTimeoutException,即使它在自己的 DatagramSocket 上并在自己的线程上运行?我在这里做错了什么?
任何帮助都非常感谢。谢谢。
测试代码
@Test
public void sendRemoteTest() throws InterruptedException, IOException {
DatagramSocket socket = new DatagramSocket(9987, InetAddress.getByName(localMachineIp));
DatagramSocket socket2 = new DatagramSocket(9988, InetAddress.getByName(localMachineIp));
UdpClientRunnable runnable = new UdpClientRunnable(socket, 1);
UdpClientRunnable runnable2 = new UdpClientRunnable(socket2, 1);
Thread t1 = new Thread(runnable);
t1.setName("T1");
Thread t2 = new Thread(runnable);
t2.setName("T2");
Thread t3 = new Thread(runnable2);
t3.setName("T3");
Thread t4 = new Thread(runnable2);
t4.setName("T4");
t1.start();
t2.start();
t3.start();
t4.start();
Thread.sleep(10000);
t1.join();
t2.join();
t3.join();
t4.join();
}
private byte toByte(int num) {
return (byte) ((byte) (0xFF) & num);
}
public class UdpClientRunnable implements Runnable {
private DatagramSocket socket;
private long delayMillis;
private byte[] dataToSend;
public UdpClientRunnable(DatagramSocket socket, long delayMilis) {
this.socket = socket;
this.delayMillis = delayMilis;
}
@Override
public synchronized void run() {
byte[] data1 = new byte[] { toByte(0x01), toByte(0x01), toByte(0x01),
toByte(0x01), toByte(0x01) };
byte[] data2 = new byte[] { toByte(0x02), toByte(0x02), toByte(0x02),
toByte(0x02), toByte(0x02) };
byte[] data3 = new byte[] { toByte(0x03), toByte(0x03), toByte(0x03),
toByte(0x03), toByte(0x03) };
byte[] data4 = new byte[] { toByte(0x04), toByte(0x04), toByte(0x04),
toByte(0x04), toByte(0x04) };
String targetIp = "";
String name = Thread.currentThread().getName();
if (name.contains("T1")) {
dataToSend = data1;
targetIp = udpServer1;
}
else if (name.contains("T2")){
dataToSend = data2;
targetIp = udpServer1;
}
else if (name.contains("T3")) {
dataToSend = data3;
targetIp = udpServer2;
}
else {
dataToSend = data4;
targetIp = udpServer2;
}
int count = 0;
while (count < 250) {
try {
sendAndReceive(targetIp, name, count, dataToSend);
Thread.sleep(delayMillis);
}
catch (IOException | InterruptedException e) {
System.out.println(e + ": " + name + ", iter: " + count);
}
finally {
++count;
}
}
}
private synchronized void sendAndReceive(String targetIp, String threadName, int count, byte[] dataToSend) throws IOException, UnknownHostException, SocketException {
byte[] rcvData = new byte[5];
DatagramPacket rcvPacket = new DatagramPacket(rcvData, rcvData.length);
socket.send(new DatagramPacket(dataToSend, 5, InetAddress.getByName(targetIp), 9999));
socket.setSoTimeout(2000);
rcvPacket = new DatagramPacket(rcvData, rcvData.length);
socket.receive(rcvPacket);
printData(threadName, count, rcvData);
if (threadName.contains("T1")) {
Assert.assertArrayEquals(new byte[] { toByte(0x01), toByte(0x01), toByte(0x01),
toByte(0x01), toByte(0x01) }, rcvData);
}
else if (threadName.contains("T2")){
Assert.assertArrayEquals(new byte[] { toByte(0x02), toByte(0x02), toByte(0x02),
toByte(0x02), toByte(0x02) }, rcvData);
}
else if (threadName.contains("T3")){
Assert.assertArrayEquals(new byte[] { toByte(0x03), toByte(0x03), toByte(0x03),
toByte(0x03), toByte(0x03) }, rcvData);
}
else if (threadName.contains("T4")){
Assert.assertArrayEquals(new byte[] { toByte(0x04), toByte(0x04), toByte(0x04),
toByte(0x04), toByte(0x04) }, rcvData);
}
}
private void printData(String name, int count, byte[] rcvData) {
String prefix = "";
for (int i = 0; i < rcvData.length; i++) {
prefix = (i == 0) ? (name + " iter " + count + ": ") : "";
System.out.print(prefix + Integer.toHexString((byte) ((byte) (0xFF) & rcvData[i])) + " ");
}
System.out.println();
}
}
查看代码,因此有两个服务器位于两台不同的机器上,我认为侦听相同的端口 999。因此,我认为问题可能出在服务器代码上。当您回复客户端时,您是否这样做
byte[] rcvData = new byte[5];
DatagramPacket rcvPacket = new DatagramPacket(rcvData, rcvData.length);
socket.receive(rcvPacket);
DatagramPacket sendPacket = new DatagramPacket(rcvPacket.getData(), rcvPacket.getLength(), rcvPacket.getAddress(), rcvPacket.getPort()); // note that here we need to use the received packet information on where to send the response back
socket.send(sendPacket);
几条附加评论
- 使构造函数 UdpClientRunnable 能够带到其他输入目标端口和目标地址,这样你就不需要在里面有所有的 if else 块
- 从
sendAndReceive
中删除同步的关键字,因为run
已同步