在tomcat 8上使用servlet 3.1的非阻塞I/O特性



有人在tomcat上尝试过servlet 3.1非阻塞技术吗?

来自浏览器的请求似乎永远等待,但当我在调试模式下运行服务器时,调用返回,但我仍然没有在日志中看到"Data read.."one_answers"Data written.."。

Servlet:

@WebServlet(urlPatterns = "/asyncn", asyncSupported = true)
public class AsyncN extends HttpServlet {
    private static final long serialVersionUID = 1L;
    @Override
    protected void service(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException {
        println("Before starting job");
        final AsyncContext actx = request.startAsync();
        actx.setTimeout(Long.MAX_VALUE);
        actx.start(new HeavyTask(actx));
        println("After starting job");
    }
    class HeavyTask implements Runnable {
        AsyncContext actx;
        HeavyTask(AsyncContext actx) {
            this.actx = actx;
        }
        @Override
        public void run() {
            try {
                Thread.currentThread().setName("Job-Thread-" + actx.getRequest().getParameter("job"));
                // set up ReadListener to read data for processing
                ServletInputStream input = actx.getRequest().getInputStream();
                ReadListener readListener = new ReadListenerImpl(input, actx);
                input.setReadListener(readListener);
            } catch (IllegalStateException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void println(String output) {
        System.out.println("[" + Thread.currentThread().getName() + "]" + output);
    }
}

听众:

public class ReadListenerImpl implements ReadListener {
    private ServletInputStream input = null;
    private AsyncContext actx = null;
    // store the processed data to be sent back to client later
    private Queue<String> queue = new LinkedBlockingQueue<>();
    ReadListenerImpl(ServletInputStream input, AsyncContext actx) {
        this.input = input;
        this.actx = actx;
    }
    @Override
    public void onDataAvailable() throws IOException {
        println("Data is now available, starting to read");
        StringBuilder sb = new StringBuilder();
        int len = -1;
        byte b[] = new byte[8];
        // We need to check input#isReady before reading data.
        // The ReadListener will be invoked again when
        // the input#isReady is changed from false to true
        while (input.isReady() && (len = input.read(b)) != -1) {
            String data = new String(b, 0, len);
            sb.append(data);
        }
        println("Data read: "+sb.toString());
        queue.add(sb.toString());
    }
    @Override
    public void onAllDataRead() throws IOException {
        println("All Data read, now invoking write listener");
        // now all data are read, set up a WriteListener to write
        ServletOutputStream output = actx.getResponse().getOutputStream();
        WriteListener writeListener = new WriteListenerImpl(output, queue, actx);
        output.setWriteListener(writeListener);
    }
    @Override
    public void onError(Throwable throwable) {
        println("onError");
        actx.complete();
        throwable.printStackTrace();
    }
    public static void println(String output) {
        System.out.println("[" + Thread.currentThread().getName() + "]" + output);
    }
}
public class WriteListenerImpl implements WriteListener {
    private ServletOutputStream output = null;
    private Queue<String> queue = null;
    private AsyncContext actx = null;
    WriteListenerImpl(ServletOutputStream output, Queue<String> queue, AsyncContext actx) {
        this.output = output;
        this.queue = queue;
        this.actx = actx;
    }
    @Override
    public void onWritePossible() throws IOException {
        println("Ready to write, writing data");
         // write while there is data and is ready to write
        while (queue.peek() != null && output.isReady()) {
            String data = queue.poll();
            //do some processing here with the data
            try {
                data = data.toUpperCase();
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            println("Data written: "+data);
            output.print(data);
        }
        // complete the async process when there is no more data to write
        if (queue.peek() == null) {
            actx.complete();
        }
    }
    @Override
    public void onError(Throwable throwable) {
        println("onError");
        actx.complete();
        throwable.printStackTrace();
    }
    public static void println(String output) {
        System.out.println("[" + Thread.currentThread().getName() + "]" + output);
    }
}

系统输出日志:

[http-nio-8080-exec-4]Before starting job
[http-nio-8080-exec-4]After starting job

Sysout日志(当我在调试模式下运行服务器时):

[http-nio-8080-exec-6]Before starting job
[http-nio-8080-exec-6]After starting job
[http-nio-8080-exec-6]All Data read, now invoking write listener
[http-nio-8080-exec-6]Ready to write, writing data

创建新线程是不必要的,从service方法中设置readListener,一切都将异步工作。

对代码的几个注释。在readListener中有:

while (input.isReady() && (len = input.read(b)) != -1)  

建议使用这个来完全坚持异步api:

while (input.isReady() && !input.isFinished())

对于写侦听器,还有:

while (queue.peek() != null && output.isReady())

你应该把条件反过来:

while (output.isReady() && queue.peek() != null)
如果最后一次写是异步的,可以防止提前调用ac.complete()

相关内容

  • 没有找到相关文章

最新更新