如何在 UDP 协议上同步通信



我进行了模拟测试,以了解我在UDP通信中面临的问题。

设置

我有一个运行 4 个 UDP 客户端的主机,每个客户端都在自己的线程(T1、T2、T3 和 T4)中。T1 和 T2 共享一个名为 socket 的数据报套接字对象,而 T3 和 T4 共享一个名为 socket2 的数据报套接字对象。

T1 和

T2 正在发送和接收来自 IP (例如) udpServer1 的 UDP 服务器的回声,而 T3 和 T4 正在与 udpServer2 通信。

问题

在并行运行线程时,我能够同步共享相同 DatagramSocket 和 Runnable 对象的 T1 和 T2。但是,当尝试运行使用不同 DatagramSocket 和 Runnable 对象的 T1 和 T3 时,T3 总是因 SocketTimeoutException 而失败,但 T1 可以毫无问题地发送和接收数据包。

总结:

  1. 并行运行 T1 和 T2(或 T3 和 T4),共享相同的 DatagramSocket 和 Runnable 对象 --> OK

  2. 并行运行 T1 和 T3,每个都使用自己的 DatagramSocket 和 Runnable 对象 -->其中一个对象 SocketTimeoutException。

问题

为什么 T3 不断获得 SocketTimeoutException,即使它在自己的 DatagramSocket 上并在自己的线程上运行?我在这里做错了什么?

任何帮助都非常感谢。谢谢。

测试代码

@Test
public void sendRemoteTest() throws InterruptedException, IOException {
    DatagramSocket socket = new DatagramSocket(9987, InetAddress.getByName(localMachineIp));
    DatagramSocket socket2 = new DatagramSocket(9988, InetAddress.getByName(localMachineIp));
    UdpClientRunnable runnable = new UdpClientRunnable(socket, 1);
    UdpClientRunnable runnable2 = new UdpClientRunnable(socket2, 1);
   
    Thread t1 = new Thread(runnable);
    t1.setName("T1");
    Thread t2 = new Thread(runnable);
    t2.setName("T2");
    Thread t3 = new Thread(runnable2);
    t3.setName("T3");
    Thread t4 = new Thread(runnable2);
    t4.setName("T4");
    t1.start();
    t2.start();
    t3.start();
    t4.start();
    Thread.sleep(10000);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
}
private byte toByte(int num) {
    return (byte) ((byte) (0xFF) & num);
}
public class UdpClientRunnable implements Runnable {
    private DatagramSocket socket;
    private long delayMillis;
    private byte[] dataToSend;
    public UdpClientRunnable(DatagramSocket socket, long delayMilis) {
       this.socket = socket;
       this.delayMillis = delayMilis;
    }
    @Override
    public synchronized void run() {
       byte[] data1 = new byte[] { toByte(0x01), toByte(0x01), toByte(0x01),
          toByte(0x01), toByte(0x01) };
       byte[] data2 = new byte[] { toByte(0x02), toByte(0x02), toByte(0x02),
          toByte(0x02), toByte(0x02) };
       byte[] data3 = new byte[] { toByte(0x03), toByte(0x03), toByte(0x03),
          toByte(0x03), toByte(0x03) };
       byte[] data4 = new byte[] { toByte(0x04), toByte(0x04), toByte(0x04),
          toByte(0x04), toByte(0x04) };
       String targetIp = "";
       String name = Thread.currentThread().getName();
       if (name.contains("T1")) {
          dataToSend = data1;
          targetIp = udpServer1;
       }
       else if (name.contains("T2")){
          dataToSend = data2;
          targetIp = udpServer1;
       }
       else if (name.contains("T3")) {
          dataToSend = data3;
          targetIp = udpServer2;
       }
       else {
          dataToSend = data4;
          targetIp = udpServer2;
       }
       int count = 0;
       while (count < 250) {
          try {
             sendAndReceive(targetIp, name, count, dataToSend);
             Thread.sleep(delayMillis);
          }
          catch (IOException | InterruptedException e) {
             System.out.println(e + ": " + name + ", iter: " + count);
          }
          finally {
             ++count;
          }
       }
    }
    private synchronized void sendAndReceive(String targetIp, String threadName, int count, byte[] dataToSend) throws IOException, UnknownHostException, SocketException {
        byte[] rcvData = new byte[5];
        DatagramPacket rcvPacket = new DatagramPacket(rcvData, rcvData.length);
        socket.send(new DatagramPacket(dataToSend, 5, InetAddress.getByName(targetIp), 9999));
        socket.setSoTimeout(2000);
        rcvPacket = new DatagramPacket(rcvData, rcvData.length);
        socket.receive(rcvPacket);
        printData(threadName, count, rcvData);
        if (threadName.contains("T1")) {
            Assert.assertArrayEquals(new byte[] { toByte(0x01), toByte(0x01), toByte(0x01),
                toByte(0x01), toByte(0x01) }, rcvData);
        }
        else if (threadName.contains("T2")){
            Assert.assertArrayEquals(new byte[] { toByte(0x02), toByte(0x02), toByte(0x02),
                toByte(0x02), toByte(0x02) }, rcvData);
        }
        else if (threadName.contains("T3")){
            Assert.assertArrayEquals(new byte[] { toByte(0x03), toByte(0x03), toByte(0x03),
                toByte(0x03), toByte(0x03) }, rcvData);
        }
        else if (threadName.contains("T4")){
            Assert.assertArrayEquals(new byte[] { toByte(0x04), toByte(0x04), toByte(0x04),
                toByte(0x04), toByte(0x04) }, rcvData);
        }
    }
    private void printData(String name, int count, byte[] rcvData) {
        String prefix = "";
        for (int i = 0; i < rcvData.length; i++) {
            prefix = (i == 0) ? (name + " iter " + count + ": ") : "";
            System.out.print(prefix + Integer.toHexString((byte) ((byte) (0xFF) & rcvData[i])) + " ");
        }
        System.out.println();
    }
  
}

查看代码,因此有两个服务器位于两台不同的机器上,我认为侦听相同的端口 999。因此,我认为问题可能出在服务器代码上。当您回复客户端时,您是否这样做

byte[] rcvData = new byte[5];
DatagramPacket rcvPacket = new DatagramPacket(rcvData, rcvData.length);
socket.receive(rcvPacket);
DatagramPacket sendPacket = new DatagramPacket(rcvPacket.getData(), rcvPacket.getLength(), rcvPacket.getAddress(), rcvPacket.getPort());  // note that here we need to use the received packet information on where to send the response back
socket.send(sendPacket);

几条附加评论

  1. 使构造函数 UdpClientRunnable 能够带到其他输入目标端口和目标地址,这样你就不需要在里面有所有的 if else 块
  2. sendAndReceive中删除同步的关键字,因为run已同步

最新更新