如何使用Disruptor制作基于Java NIO(非阻塞IO)的TCP服务器?



我正在尝试使用Disruptor实现基于JAVA NIO的TCP服务器。

Java NIO以非阻塞方式工作。所有新连接首先命中ServerAccept Socket。然后使用键(从 selector.select()) 方法返回,调用适当的处理程序(如果键可接受,则创建一个新的套接字通道,并将通道注册到选择器,如果键可读,则从通道读取内容,然后注册写入,如果密钥可写,则写入响应应具有的任何通道)。最简单的基于 NIO 的服务器在单个线程中工作(所有处理程序和选择器都在同一线程中)。

Java Disruptor是一个高性能的Ring实现,可用于在不同组件(线程)之间传递消息。

我的问题如下。

  1. 我们可以在NIO设计中使用多个线程吗?

  2. 我们可以在单独的线程中运行事件处理程序吗?

  3. 如果我们可以在单独的线程中运行事件处理程序,我们如何在线程之间传递 selectionKeys 和通道?

  4. Java Disruptor 库可以用于在主线程(选择器运行)和 eventHandler 线程之间传输数据吗?

  5. 如果可能,设计方法是什么?(EventProducer、EventConsumer 和 RingBuffer 在 Disruptor 中的行为是什么?)

您可以使用任何线程消息传递方法创建基于 NIO 的服务器,其中中断器就是这样的选项之一。

在那里,您需要解决的问题是如何将工作共享到不同的线程(而不是在主线程本身中处理请求)。

因此,您可以使用中断器作为消息传递方法将从套接字连接获得的缓冲区传递到单独的线程。此外,您需要维护一个共享的并发哈希映射,以通知主线程(运行事件循环)响应是否已准备就绪。下面是一个示例。

HttpEvent.java

import java.nio.ByteBuffer;
public class HttpEvent
{
private ByteBuffer buffer;
private String requestId;
private int numRead;

public ByteBuffer getBuffer() {
return buffer;
}
public void setBuffer(ByteBuffer buffer) {
this.buffer = buffer;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public int getNumRead() {
return numRead;
}
public void setNumRead(int numRead) {
this.numRead = numRead;
}
}

HttpEventFactory.java

import com.lmax.disruptor.EventFactory;
public class HttpEventFactory implements EventFactory<HttpEvent>
{
public HttpEvent newInstance()
{
return new HttpEvent();
}
}

HttpEventHandler.java

import com.lmax.disruptor.EventHandler;
import java.nio.ByteBuffer;
import java.util.Dictionary;
import java.util.concurrent.ConcurrentHashMap;
public class HttpEventHandler implements EventHandler<HttpEvent>
{
private int id;
private ConcurrentHashMap concurrentHashMap;
public HttpEventHandler(int id, ConcurrentHashMap concurrentHashMap){
this.id = id;
this.concurrentHashMap = concurrentHashMap;
}
public void onEvent(HttpEvent event, long sequence, boolean endOfBatch) throws Exception
{
if( sequence % Runtime.getRuntime().availableProcessors()==id){

String requestId = event.getRequestId();
ByteBuffer buffer = event.getBuffer();
int numRead= event.getNumRead();
ByteBuffer responseBuffer = handleRequest(buffer, numRead);

this.concurrentHashMap.put(requestId, responseBuffer);
}
}
private ByteBuffer handleRequest(ByteBuffer buffer, int numRead) throws Exception {
buffer.flip();
byte[] data = new byte[numRead];
System.arraycopy(buffer.array(), 0, data, 0, numRead);
String request = new String(data, "US-ASCII");
request = request.split("n")[0].trim();

String response = serverRequest(request);
buffer.clear();
buffer.put(response.getBytes());
return  buffer;
}
private String serverRequest(String request) throws Exception {
String response = "Sample Response";
if (request.startsWith("GET")) {
// http request parsing and response generation should be done here.    

return  response;
}
}

HttpEventMain.java

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.lang3.RandomStringUtils;
import java.io.IOException;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class HttpEventMain
{
private InetAddress addr;
private int port;
private Selector selector;
private HttpEventProducer producer ;
private ConcurrentHashMap concurrentHashMapResponse;
private ConcurrentHashMap concurrentHashMapKey;
public HttpEventMain(InetAddress addr, int port) throws IOException {
this.setAddr(addr);
this.setPort(port);
this.setConcurrentHashMapResponse(new ConcurrentHashMap<>());
this.concurrentHashMapKey = new ConcurrentHashMap<>();
}

public static void main(String[] args) throws Exception
{
System.out.println("----- Running the server on machine with "+Runtime.getRuntime().availableProcessors()+" cores -----");
HttpEventMain server = new HttpEventMain(null, 4333);

HttpEventFactory factory = new HttpEventFactory();

int bufferSize = 1024;

Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // a thread pool to which we can assign tasks

Disruptor<HttpEvent> disruptor = new Disruptor<HttpEvent>(factory, bufferSize, executor);
HttpEventHandler [] handlers = new HttpEventHandler[Runtime.getRuntime().availableProcessors()];
for(int i = 0; i<Runtime.getRuntime().availableProcessors();i++){
handlers[i] = new HttpEventHandler(i, server.getConcurrentHashMapResponse());
}

disruptor.handleEventsWith(handlers);


disruptor.start();

RingBuffer<HttpEvent> ringBuffer = disruptor.getRingBuffer();
server.setProducer(new HttpEventProducer(ringBuffer, server.getConcurrentHashMapResponse()));
try {
System.out.println("n====================Server Details====================");
System.out.println("Server Machine: "+ InetAddress.getLocalHost().getCanonicalHostName());
System.out.println("Port number: " + server.getPort());
} catch (UnknownHostException e1) {
e1.printStackTrace();
}
try {
server.start();
} catch (IOException e) {
System.err.println("Error occured in HttpEventMain:" + e.getMessage());
System.exit(0);
}

}
private void start() throws IOException {
this.selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);

InetSocketAddress listenAddr = new InetSocketAddress(this.addr, this.port);
serverChannel.socket().bind(listenAddr);
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
System.out.println("Server ready. Ctrl-C to stop.");
while (true) {
this.selector.select();

Iterator keys = this.selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = (SelectionKey) keys.next();
keys.remove();
if (! key.isValid()) {
continue;
}
if (key.isAcceptable()) {
this.accept(key);
}
else if (key.isReadable()) {
this.read(key);
}
else if (key.isWritable()) {
this.write(key);
}
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);

Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
channel.register(this.selector, SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(8192);
int numRead = -1;
try {
numRead = channel.read(buffer);
}
catch (IOException e) {
e.printStackTrace();
}
if (numRead == -1) {
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
channel.close();
key.cancel();
return;
}
String requestID = RandomStringUtils.random(15, true, true);
while(concurrentHashMapKey.containsValue(requestID) || concurrentHashMapResponse.containsKey(requestID)){
requestID = RandomStringUtils.random(15, true, true);
}
concurrentHashMapKey.put(key, requestID);
this.producer.onData(requestID, buffer, numRead);
channel.register(this.selector, SelectionKey.OP_WRITE, buffer);
}
private boolean responseReady(SelectionKey key){
String requestId = concurrentHashMapKey.get(key).toString();
String response = concurrentHashMapResponse.get(requestId).toString();
if(response!="0"){
concurrentHashMapKey.remove(key);
concurrentHashMapResponse.remove(requestId);
return true;
}else{
return false;
}
}
private void write(SelectionKey key) throws IOException {
if(responseReady(key)) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer inputBuffer = (ByteBuffer) key.attachment();
inputBuffer.flip();
channel.write(inputBuffer);
channel.close();
key.cancel();
}
}
public HttpEventProducer getProducer() {
return producer;
}
public void setProducer(HttpEventProducer producer) {
this.producer = producer;
}
public ConcurrentHashMap getConcurrentHashMapResponse() {
return concurrentHashMapResponse;
}
public void setConcurrentHashMapResponse(ConcurrentHashMap concurrentHashMapResponse) {
this.concurrentHashMapResponse = concurrentHashMapResponse;
}
public InetAddress getAddr() {
return addr;
}
public void setAddr(InetAddress addr) {
this.addr = addr;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public Selector getSelector() {
return selector;
}
public void setSelector(Selector selector) {
this.selector = selector;
}
}

HttpEventProducer.java

import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
public class HttpEventProducer
{
private final RingBuffer<HttpEvent> ringBuffer;
private final ConcurrentHashMap concurrentHashMap;
public HttpEventProducer(RingBuffer<HttpEvent> ringBuffer, ConcurrentHashMap concurrentHashMap)
{
this.ringBuffer = ringBuffer;
this.concurrentHashMap = concurrentHashMap;
}
public void onData(String requestId, ByteBuffer buffer, int numRead)
{
long sequence = ringBuffer.next();
try
{
HttpEvent event = ringBuffer.get(sequence);
event.setBuffer(buffer);
event.setRequestId(requestId);
event.setNumRead(numRead);
}
finally
{
concurrentHashMap.put(requestId, "0");
ringBuffer.publish(sequence);

}
}
}

最新更新