Netty ChannelInboundHandlerAdapter异步/多线程



我很难理解netty、EventLoopGroup(MultithreadEventLoopGroup(、MultithreadEventExecutorGroupDefaultEventExecutorGroup中多线程背后的概念

我试图了解服务器如何处理多个客户端同时发送请求,这些请求将执行一些业务逻辑和CRUD操作,这些操作将添加到RTT中。下面是我的netty服务器代码,它可以工作,但我正在努力了解它将如何与并发用户和多个开放渠道一起工作。

我有一个简单的ServerBootstrap

@Component
@RequiredArgsConstructor
public class SocketServer {
private final ContextAwareLogger logger;
private final ServerInitializer serverInitializer;
private final NioEventLoopGroup bossGroup;
private final NioEventLoopGroup workerGroup;
private Channel mainChannel;
@PostConstruct
public void start() {
try {
ServerBootstrap bootstrap = init();
mainChannel = bootstrap.bind(8484).sync().channel(); // save the main channel so we can cleanly close it when app is shutdown
logger.info("Netty server started...");
} catch (Exception e) {
e.printStackTrace();
}
}
@PreDestroy
public void stop() throws InterruptedException {
logger.info("Shutting down Netty server");
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
mainChannel.closeFuture().sync();
logger.info("Netty Server shutdown complete.");
}
private ServerBootstrap init() {
return new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 5000)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(serverInitializer);
}
}

ChannelInitializer:

@Component
@RequiredArgsConstructor
public class ServerInitializer extends ChannelInitializer<SocketChannel> {
private final PacketDecoder packetDecoder;
private final ServerHandler serverHandler;
private final PacketEncoder packetEncoder;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast("decoder", packetDecoder) // ByteArrayDecoder
.addLast("encoder", packetEncoder) // ByteArrayEncoder
.addLast("inbound", serverHandler); // ChannelInboundHandlerAdapter
}
}

ChannelInboundHandlerAdapter:

@Component
@Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Autowired
private SomeService someService;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// contains db access
byte[] accept = someService.validateClient(ctx.channel());
ctx.channel().writeAndFlush(accept);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// may contain db access
byte[] response = someService.processPacket(ctx.channel(), msg));
ctx.channel().writeAndFlush(response);
}
}

现在,当客户端连接时,我知道将打开一个新的通道,并且将重用处理程序。要求每个客户端请求/响应都需要立即处理,而无需等待其他客户端的CRUD操作完成。

我的channelRead和channelActive等是否是异步的,因为我使用的是NioEventLoopGroup(即每个客户端的通道操作是否独立运行(?

如果一个客户端连续发送多个请求,是否保证以相同的顺序处理这些请求?

我需要为入站处理程序指定DefaultEventExecutorGroup吗?(https://stackoverflow.com/a/28305019/1738539)

您需要为ServerHandler使用DefaultEventExecutorGroup,或者将validateClient(...)/processPacket(...)调度到您自己的ThreadPool。否则将导致EventLoop线程阻塞,因此在阻塞操作完成之前,无法处理此EventLoop的其他IO。

最新更新