Java,SocketChannel 选择器,将写入通道与阻塞队列相结合



我目前正在尝试从一个线程操作 SocketChannel(我之前用两个线程和常规套接字实现了我想做的事情,但每个客户端两个线程似乎有点多(。我希望能够在有数据要读取时读取(选择器对此工作正常(。我只想在阻塞队列中有项目时编写(在我的示例中,我有帧队列(。

        @Override
        public void run() {
            super.run();
            SelectionKey readKey = null;
            try {
                final int interests = SelectionKey.OP_READ;
                socketChannel.configureBlocking(false);
                readKey = socketChannel.register(selector, interests);
            } catch (IOException e) {
                e.printStackTrace();
                try {
                    close();
                } catch (Exception e1) {
                    throw new RuntimeException("FAILURE");
                }
            }
            if (null != readKey) {
                while (running) {
                    try {
                        System.out.println("LOOP ENTRY");
                        selector.select();
                        if (readKey.isReadable()) {
                            System.out.println("IS READABLE");
                        }
                        if (readKey.isWritable() && (null != framesQueues.peek())) {
                            System.out.println("IS WRITEABLE");
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
现在,它

所做的是,它疯狂地循环而不停止,这显然很糟糕。我正在寻找一种方法,当我的阻塞队列中有项目或有字节要读取时,我的选择器唤醒。蔚来汽车中是否有允许这样做的工具?

如果没有,我可以实施什么?我注定要为每个客户端使用两个线程吗?这是一个爱好项目,但我正在努力实现尽可能低的延迟,所以睡眠循环不是我想要的。

我弄乱了nio插座,我把一些东西放在一起,希望很容易理解。您需要做的就是telnet localhost 5050.我无法访问您的其余代码,所以我不知道您缺少什么。我假设您没有从选择器中清除选定的键,或者可能仅在完成编写后才将兴趣操作更改为 (READ(。

public static void main(String... args) throws IOException {
    final Selector selector = Selector.open();
    //every 10 seconds this thread will go through all the connections and
    //send "(x times) (date) to every client
    new Thread() {
        public void run() {
            for (int i = 0; selector.isOpen(); i++) {
                for (SelectionKey key : selector.keys()) {
                    if (key.channel() instanceof SocketChannel) {
                        ((Queue<ByteBuffer>) key.attachment()).add(ByteBuffer.wrap((i + " - " + new Date() + "n").getBytes()));
                        key.interestOps(OP_READ | OP_WRITE); //enable write flag
                    }
                }
                selector.wakeup(); //wakeup so it can get to work and begin writing
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                }
            }
        }
    }.start();

    //create server on port 5050
    ServerSocketChannel server = ServerSocketChannel.open();
    server.configureBlocking(false);
    server.bind(new InetSocketAddress(5050));
    server.register(selector, OP_ACCEPT);
    //reusable buffer
    final ByteBuffer readBuffer = ByteBuffer.allocate(0x1000);
    while (selector.isOpen()) {
        int selected = selector.select();
        System.out.println("Selected " + selected + (selected == 1 ? " key." : " keys."));
        if (selected > 0) {
            for (SelectionKey key : selector.selectedKeys()) {
                if (key.isValid() && key.isReadable()) {
                    System.out.println("Readable: " + key.channel());
                    SocketChannel socket = ((SocketChannel) key.channel());
                    readBuffer.clear();
                    int read = socket.read(readBuffer);
                    if (read == -1) {
                        System.out.println("Socket Closed " + key.channel());
                        socket.close();
                        continue; //socket is closed. continue loop
                    }
                    //we will add what the client sent to the queue to echo it back
                    if (read > 0) {
                        readBuffer.flip();
                        ByteBuffer buffer = ByteBuffer.allocate(readBuffer.remaining());
                        ((Queue<Buffer>) key.attachment()).add(buffer.put(readBuffer).flip());
                        key.interestOps(OP_WRITE | OP_READ); //enable write flag
                    }
                }
                if (key.isValid() && key.isWritable()) {
                    System.out.println("Writable: " + key.channel());
                    SocketChannel socket = (SocketChannel) key.channel();
                    //retrieve attachment(ArrayBlockingQueue<ByteBuffer>)
                    Queue<Buffer> dataToWrite = (Queue<Buffer>) key.attachment();
                    //only remove from queue once we have completely written
                    //this is why we call peek first, and only remove once (buffer.remaining() == 0)
                    for (ByteBuffer buffer; (buffer = (ByteBuffer) dataToWrite.peek()) != null;) {
                        socket.write(buffer);
                        if (buffer.remaining() == 0) dataToWrite.remove();
                        else break; //can not write anymore. Wait for next write event
                    }
                    //once queue is empty we need to stop watching for write events
                    if (dataToWrite.isEmpty()) key.interestOps(OP_READ);
                }
                if (key.isValid() && key.isAcceptable()) {
                    System.out.println("Acceptable: " + key.channel());
                    SocketChannel socket = ((ServerSocketChannel) key.channel()).accept();
                    socket.configureBlocking(false);
                    //add a ArrayBlockingQueue<ByteBuffer> as an attachment for the socket
                    socket.register(selector, OP_READ, new ArrayBlockingQueue<ByteBuffer>(1000));
                }
            }
            selector.selectedKeys().clear(); //must clear all when finished or loop will continue selecting nothing
        }
    }
}

最新更新