使用Java NIO发送数据时出现延迟



我需要您对Java NIO包的建议。我在网络上发送数据包时遇到延迟问题。最初的代码实际上是我将SFML这本书的源代码移植到Java的端口,但在这里我只向您展示一个最小的工作示例,在这个示例中,问题被重现。尽管这段代码确实包含了SFML库中的一些片段(实际上是创建了一个窗口和一个事件循环),但我相信这对问题没有影响。

在这里我只显示部分代码,完整版本在这里可用。

因此,该程序有两个实体:服务器和客户端。如果在服务器模式下启动应用程序,则会创建一个服务器,开始侦听新的连接,并自动创建一个新的客户端并尝试连接到服务器。在客户端模式下,只创建一个客户端并连接到服务器。

该应用程序还创建了一个新的基本GUI窗口,并启动一个事件循环,所有事情都发生在该循环中。

客户端向服务器发送数据包。它只通过记录接受的事实来处理它们。客户端可以发送两种类型的数据包:周期性数据包(具有增量ID)和事件数据包(应用程序对按下SPACE或M按钮作出反应)。

客户端发送数据包:

public void update(Time dt) throws IOException {
    if (!isConnected) return;
    if (tickClock.getElapsedTime().compareTo(Time.getSeconds(1.f / 20.f)) > 0) {
        Packet intervalUpdatePacket = new Packet();
        intervalUpdatePacket.append(PacketType.INTERVAL_UPDATE);
        intervalUpdatePacket.append(intervalCounter++);
        PacketReaderWriter.send(socketChannel, intervalUpdatePacket);
        tickClock.restart();
    }
}
public void handleEvent(Event event) throws IOException {
    if (isConnected && (event.type == Event.Type.KEY_PRESSED)) {
        KeyEvent keyEvent = event.asKeyEvent();
        if (keyEvent.key == Keyboard.Key.SPACE) {
            LOGGER.info("press SPACE");
            Packet spacePacket = new Packet();
            spacePacket.append(PacketType.SPACE_BUTTON);
            PacketReaderWriter.send(socketChannel, spacePacket);
        }
        if (keyEvent.key == Keyboard.Key.M) {
            LOGGER.info("press M");
            Packet mPacket = new Packet();
            mPacket.append(PacketType.M_BUTTON);
            PacketReaderWriter.send(socketChannel, mPacket);
        }
    }
}

服务器接受数据包:

private void handleIncomingPackets() throws IOException {
    readSelector.selectNow();
    Set<SelectionKey> readKeys = readSelector.selectedKeys();
    Iterator<SelectionKey> it = readKeys.iterator();
    while (it.hasNext()) {
        SelectionKey key = it.next();
        it.remove();
        SocketChannel channel = (SocketChannel) key.channel();
        Packet packet = null;
        try {
            packet = PacketReaderWriter.receive(channel);
        } catch (NothingToReadException e) {
            e.printStackTrace();
        }
        if (packet != null) {
            // Interpret packet and react to it
            handleIncomingPacket(packet, channel);
        }
    }
}
private void handleIncomingPacket(Packet packet, SocketChannel channel) {
    PacketType packetType = (PacketType) packet.get();
    switch (packetType) {
        case INTERVAL_UPDATE:
            int intervalId = (int) packet.get();
            break;
        case SPACE_BUTTON:
            LOGGER.info("handling SPACE button");
            break;
        case M_BUTTON:
            LOGGER.info("handling M button");
            break;
    }
}

这是一个PacketReaderWriter对象:

package server;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class PacketReaderWriter {
    private static final int PACKET_SIZE_LENGTH = 4;
    private static final ByteBuffer packetSizeReadBuffer = ByteBuffer.allocate(PACKET_SIZE_LENGTH);
    private static ByteBuffer clientReadBuffer;
    private static byte[] encode(Packet packet) throws IOException {
        try (
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos)
        ) {
            oos.writeObject(packet);
            return baos.toByteArray();
        }
    }
    private static Packet decode(byte[] encodedPacket) throws IOException, ClassNotFoundException {
        try (ObjectInputStream oi = new ObjectInputStream(new ByteArrayInputStream(encodedPacket))) {
            return (Packet) oi.readObject();
        }
    }
    public static void send(SocketChannel channel, Packet packet) throws IOException {
        byte[] encodedPacket = encode(packet);
        ByteBuffer packetSizeBuffer = ByteBuffer.allocate(PACKET_SIZE_LENGTH).putInt(encodedPacket.length);
        packetSizeBuffer.flip();
        // Send packet size
        channel.write(packetSizeBuffer);
        // Send packet content
        ByteBuffer packetBuffer = ByteBuffer.wrap(encodedPacket);
        channel.write(packetBuffer);
    }
    public static Packet receive(SocketChannel channel) throws IOException, NothingToReadException {
        int bytesRead;
        // Read packet size
        packetSizeReadBuffer.clear();
        bytesRead = channel.read(packetSizeReadBuffer);
        if (bytesRead == -1) {
            channel.close();
            throw new NothingToReadException();
        }
        if (bytesRead == 0) return null;
        packetSizeReadBuffer.flip();
        int packetSize = packetSizeReadBuffer.getInt();
        // Read packet
        clientReadBuffer = ByteBuffer.allocate(packetSize);
        bytesRead = channel.read(clientReadBuffer);
        if (bytesRead == -1) {
            channel.close();
            throw new NothingToReadException();
        }
        if (bytesRead == 0) return null; 
        clientReadBuffer.flip();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        baos.write(clientReadBuffer.array(), 0, bytesRead);
        clientReadBuffer.clear();
        try {
            return decode(baos.toByteArray());
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        }
    }
}

问题是:我在按下按钮(从客户端发送相应的数据包)和在服务器上接受这个数据包之间有很大的延迟。如果我以客户端模式启动应用程序的新实例(简而言之,只需添加一个新客户端),则延迟会变得更大。

我看不出为什么这些周期性的数据包会产生如此大的网络负载,以至于其他数据包无法通过,但也许我只是错过了一些东西。在这里,我不得不说,我不是Java专家,所以不要因为没有看到明显的东西而责怪我:)

有人有什么想法吗?

谢谢!

我决定看看Github回购。

您的Server.run()看起来是这样的。

    public void run() {
    while (isRunning) {
        try {
            handleIncomingConnections();
            handleIncomingPackets();
        } catch (IOException e) {
            e.printStackTrace();
        }
        try {
            // Sleep to prevent server from consuming 100% CPU
            sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

睡眠(100)将导致每秒大约10次对handleIncomingPackets()的调用。handleIncomingPackets()将依次选择一个客户端通道,并对单个接收到的数据包调用handleIncoming Packet()。如果我理解正确的话,服务器总共可以每秒处理每个客户端10个数据包。

另一方面,客户端尝试每秒发送20个PacketType.INTERVAL_UPDATE类型的数据包。客户端每秒必须发送更少的数据包,或者服务器需要能够每秒处理更多的数据包。

当前的睡眠(100)意味着,在服务器能够对单个数据包做出响应之前,即使在非过载的情况下,也总是会有高达100ms左右的延迟。不过,如果你确保你真的读取了通道上所有可用的数据包,而不是每次只读取一个,这可能没问题。

总之:为了提高响应时间,你必须做的最小的改变就是减少sleep()时间。10毫秒就可以了。但我也建议尝试检查每次迭代中是否有一个以上的数据包可用。

更新:在你链接的c++文件中,我的直觉是它每次迭代读取的数据包不止一个。

<snip>
while (peer->socket.receive(packet) == sf::Socket::Done)
        {
            // Interpret packet and react to it
            handleIncomingPacket(packet, *peer, detectedTimeout);
</snip>

while循环将读取所有可用数据包。与Java版本相比,在Java版本中,每个客户端每次服务器迭代读取一个数据包。

if (packet != null) {
    // Interpret packet and react to it
    handleIncomingPacket(packet, channel);
}

您需要确保您也阅读了Java版本的所有可用数据包。

如果你只是想让自己相信客户端代码发送的数据包比服务器代码所能处理的要多,那么可以通过将sleep()临时设置为10ms来快速完成。

最新更新