NIO SocketChannel说当有数据时没有数据(或者选择器没有通知我)



我有一个功能正常的客户机-服务器设备,它可以使用NIO成功地相互连接和发送消息。

现在我唯一的困惑是,当socketChannel.read()返回0时,我应该如何继续阅读。

我有一个协议,它发送前4个字节作为预期的传入字节数。即使有这么多钱,我也会遇到一个潜在的问题。

然而,有时我可能会读到这样的内容:
5 // Read 5 bytes when calling socketChannel.read()
0 // Read 0 bytes when calling socketChannel.read() immediately after

当我到达0时,我认为我已经完成了读取,需要等待更多的数据。

但是,当我这样做时,OP_READ似乎不会在稍后再次执行selectNow()时被触发。我检查了键,它的readyops()和interestops()设置为1(这是OP_READ),但它不想认识到是时候再次读取了。

我发现如果我继续循环读取,我可能会得到这样的结果:

5 // socketChannel.read()
0 // socketChannel.read()
7 // socketChannel.read() (Done since I have all my bytes)
0
0
0
...

我在这里有点困惑,因为这意味着:

  • 那里没有数据,所以可用的0是合法的,但是当其余的数据进来时,选择器拒绝使用selectNow()

  • 返回键
  • 数据都在那里,但由于某种原因读取时返回0。

我应该在selectNow()返回它作为活动键后重新注册通道吗?(虽然我没有从OP_CONNECT切换到OP_READ之间…所以我猜不会)。我觉得盲目地在循环中打转是危险的,会浪费处理周期。

我应该继续调查他们吗?这让我在OP_READ实际触发时感到困惑。

这是由于我的一个错误,我没有在读取的字节缓冲区上调用.clear()。这将导致它返回0 read,即使数据已经流进。

对于想要了解简单客户端如何工作(尽管异常处理非常糟糕)的人来说,这个例子也很有用。不能保证这将正常工作,并且可能会有问题,因为它被设计为一个快速和肮脏的测试。

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class Test {
    public static final int PORT = 22222;
    public static void main(String[] args) throws IOException {
        Thread s = new Thread(new Server());
        Thread c = new Thread(new Client());
        s.start();
        c.start();
    }
}
class Client implements Runnable {
    public Selector selector;
    public SocketChannel sc;
    public Client() throws IOException {
        selector = Selector.open();
    }
    @Override
    public void run() {
        try {
            sc = SocketChannel.open();
            sc.socket().setTcpNoDelay(true);
            sc.configureBlocking(false);
            SelectionKey k = sc.register(selector, SelectionKey.OP_CONNECT);
            boolean firstConnect = sc.connect(new InetSocketAddress("localhost", Test.PORT));
            if (firstConnect) {
                System.out.println("Connected on first connect, de-registering OP_CONNECT");
                k.interestOps(SelectionKey.OP_READ);
            }
            while (true) {
                int keys = selector.selectNow();
                if (keys > 0) {
                    for (SelectionKey key : selector.selectedKeys()) {
                        if (key.isConnectable()) {
                            boolean finishConnectResult = sc.finishConnect();
                            key.interestOps(SelectionKey.OP_READ);
                            System.out.println("Finished connection: " + finishConnectResult);
                        }
                        if (key.isReadable()) {
                            ByteBuffer bb = ByteBuffer.allocate(2);
                            int bytesRead = 0;
                            while ((bytesRead = sc.read(bb)) > 0) {
                                bb.flip();
                                System.out.println(bytesRead + " bytes read");
                                System.out.println(bb.get() + ", " + bb.get());
                                //bb.clear(); // If this is not commented, it will not be handled properly.
                            }
                            System.out.println("Last bytes read value = " + bytesRead);
                            System.exit(0);
                        }
                    }
                }
                Thread.sleep(5);
            }
        } catch (Exception e) { 
            e.printStackTrace();
            throw new RuntimeException();
        }
    }
}
class Server implements Runnable {
    public Selector selector;
    public SocketChannel sc;
    public Server() throws IOException {
        selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(Test.PORT));
        ssc.register(selector, SelectionKey.OP_ACCEPT);
    }
    @Override
    public void run() {
        boolean notSentData = true;
        try {
            while (true) {
                int keys = selector.selectNow();
                if (keys > 0) {
                    for (SelectionKey key : selector.selectedKeys()) {
                        if (key.isAcceptable()) {
                            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                            sc = ssc.accept();
                            if (sc != null) {
                                sc.configureBlocking(false);
                                sc.socket().setTcpNoDelay(true); // Required in my application
                                sc.register(selector, SelectionKey.OP_WRITE);
                                System.out.println("Server accepted connection");
                            } else {
                                System.out.println("Got null connection");
                            }
                        }
                    }
                }
                if (sc != null && notSentData) {
                    ByteBuffer bb = ByteBuffer.allocate(4);
                    bb.put(new byte[]{ 1, 2, 3, -1});
                    bb.flip();
                    int wrote = sc.write(bb);
                    System.out.println("Wrote " + wrote + " bytes");
                    notSentData = false;
                }
                Thread.sleep(5);
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException();
        }
    }
}

最新更新