Netty 服务器无法获取客户端发送的所有消息



我在项目中有一个netty服务器和客户端,并希望在它们之间交换消息。

网络服务器代码:

//主事件池
private EventLoopGroup bossGroup = new NioEventLoopGroup();
//副事件池
private EventLoopGroup workerGroup = new NioEventLoopGroup();
//服务端通道
private Channel serverChannel;
/**
* 绑定本机监听
*
* @throws Exception
*/
public void Start(int port) throws Exception {
//启动器
ServerBootstrap serverBootstrap = new ServerBootstrap();
//为Acceptor设置事件池,为客户端接收设置事件池
serverBootstrap.group(bossGroup, workerGroup)
//工厂模式,创建NioServerSocketChannel类对象
.channel(NioServerSocketChannel.class)
//等待队列大小
.option(ChannelOption.SO_BACKLOG, 100)
//地址复用
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true)
//日志记录组件的level
.handler(new LoggingHandler(LogLevel.INFO))
//各种业务处理handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//编码器
channel.pipeline().addLast("nettyMessageDecoder", new MicroMessageDecoder(1024, 4, 4));
//解码器
channel.pipeline().addLast("nettyMessageEncoder", new MicroMessageEncoder());
//业务处理handler
channel.pipeline().addLast("nettyHandler", new MicroServerHandler());
}
});
//绑定本机
String host = "127.0.0.1";
//绑定端口,同步等待成功
ChannelFuture future = serverBootstrap.bind(host, port).sync();
//注册连接事件监听器
future.addListener(cfl -> {
if (cfl.isSuccess()) {
logger.info("服务端[" + host + ":" + port + "]已上线...");
serverChannel = future.channel();
}
});
//注册关闭事件监听器
future.channel().closeFuture().addListener(cfl -> {
//关闭服务端
close();
logger.info("服务端[" + host + ":" + port + "]已下线...");
});
}
/**
* 关闭server
*/
public void close() {
//关闭套接字
if(serverChannel!=null){
serverChannel.close();
}
//关闭主线程组
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
//关闭副线程组
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}

netty 客户端代码:

@Service
public class MicroClient {
//日志记录
private static final Logger logger = LoggerFactory.getLogger(MicroClient.class);
//事件池
private EventLoopGroup group = new NioEventLoopGroup();
//启动器
private Bootstrap bootstrap = new Bootstrap();
//客户端通道
private Channel clientChannel;
//客户端处理handler
private MicroClientHandler microClientHandler;
/**
* 连接服务器
* @param host
* @param port
* @throws InterruptedException
*/
public void Connect(String host, int port) throws InterruptedException {
microClientHandler = new MicroClientHandler();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//解码器
channel.pipeline().addLast("nettyMessageDecoder", new MicroMessageDecoder(1024, 4, 4));
//编码器
channel.pipeline().addLast("nettyMessageEncoder", new MicroMessageEncoder());
//业务处理
channel.pipeline().addLast("clientHandler", microClientHandler);
}
});
//发起同步连接操作
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//检测连接完毕
if(channelFuture.isDone()){
logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已连接...");
clientChannel = channelFuture.channel();
}
//注册关闭事件
channelFuture.channel().closeFuture().addListener(cfl -> {
close();
logger.info("客户端[" + channelFuture.channel().localAddress().toString() + "]已断开...");
});
}
/**
* 客户端关闭
*/
private void close() {
//关闭客户端套接字
if(clientChannel!=null){
clientChannel.close();
}
//关闭客户端线程组
if (group != null) {
group.shutdownGracefully();
}
}
/**
* 客户端发送信息
* @param microMessage
*/
public void send( MicroMessage microMessage) {
microClientHandler.send(microMessage);
}
}

服务器处理程序代码:

public class MicroServerHandler  extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(MicroServerHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MicroMessage message = (MicroMessage) msg;
logger.error("receive client message : " + message.getMessage());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
ctx.close();
}
}

客户端处理程序代码:

public class MicroClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(MicroClientHandler.class);
private ChannelHandlerContext ctx;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
for (int i = 0; i < 10; i++) {
String message = "message timestamp " + System.currentTimeMillis() + " " + i;
MicroMessage microMessage = new MicroMessage();
microMessage.setMessage(message);
ctx.writeAndFlush(microMessage);
System.out.println("send client message : " + message);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MicroMessage message = (MicroMessage) msg;
}
public void send(MicroMessage microMessage) {
if (ctx != null) {
ctx.writeAndFlush(microMessage);
}else{
logger.error("ctx is not prepared well now...");
}
}
}

消息解码器代码:

public class MicroMessageDecoder extends LengthFieldBasedFrameDecoder{
public MicroMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}
@Override
public  Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
try {
byte[] dstBytes = new byte[in.readableBytes()];
in.readBytes(dstBytes, 0, in.readableBytes());
MicroMessage microMessage = MicroSerializeUtil.deserialize(dstBytes, MicroMessage.class);
return microMessage;
} catch (Exception e) {
System.out.println("exception when decoding: " + e);
return null;
}
}
}

消息编码器代码:

public class MicroMessageEncoder extends MessageToByteEncoder<MicroMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, MicroMessage msg, ByteBuf out) throws Exception {
out.writeBytes(MicroSerializeUtil.serialize(msg));
}
}

在SerializeUtil代码中,我使用原型作为我的编解码器内容:

public class MicroSerializeUtil {
private static class SerializeData{
private Object target;
}
@SuppressWarnings("unchecked")
public static byte[] serialize(Object object) {
SerializeData serializeData = new SerializeData();
serializeData.target = object;
Class<SerializeData> serializeDataClass = (Class<SerializeData>) serializeData.getClass();
LinkedBuffer linkedBuffer = LinkedBuffer.allocate(1024 * 4);
try {
Schema<SerializeData> schema = RuntimeSchema.getSchema(serializeDataClass);
return ProtobufIOUtil.toByteArray(serializeData, schema, linkedBuffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
linkedBuffer.clear();
}
}
@SuppressWarnings("unchecked")
public static <T> T deserialize(byte[] data, Class<T> clazz) {
try {
Schema<SerializeData> schema = RuntimeSchema.getSchema(SerializeData.class);
SerializeData serializeData = schema.newMessage();
ProtobufIOUtil.mergeFrom(data, serializeData, schema);
return (T) serializeData.target;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}

服务器测试如下:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath*:spring-config.xml"})
public class ServerTest {
@Resource
private MicroServer microServer;
@Test
public void testServer() throws Exception {
microServer.Start(9023);
System.in.read();
}
}

客户端测试如下:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath*:spring-config.xml"})
public class ClientTest {
@Resource
private MicroClient microClient;
@Before
public void init() throws InterruptedException {
microClient.Connect("127.0.0.1",9023);
}
@Test
public void testClient() throws Exception {
System.in.read();
}
}

服务器输出如下:

2020-06-10 17:21:54,970 INFO  [nioEventLoopGroup-3-1] micro.MicroServer (MicroServer.java:82) - 服务端[127.0.0.1:9023]已上线...
2020-06-10 17:22:00,232 ERROR [nioEventLoopGroup-4-1] micro.MicroServerHandler (MicroServerHandler.java:21) - receive client message : message timestamp 1591780920120 9

客户端输出如下:

2020-06-10 17:21:59,988 INFO  [main] micro.MicroClient (MicroClient.java:67) - 客户端[/127.0.0.1:49299]已连接...
send client message : message timestamp 1591780919987 0
send client message : message timestamp 1591780920117 1
send client message : message timestamp 1591780920117 2
send client message : message timestamp 1591780920118 3
send client message : message timestamp 1591780920118 4
send client message : message timestamp 1591780920118 5
send client message : message timestamp 1591780920119 6
send client message : message timestamp 1591780920119 7
send client message : message timestamp 1591780920119 8
send client message : message timestamp 1591780920120 9

所以从日志的输出中我们可以看到,客户端向服务端发送了 10 条消息,而服务端只接收了一条消息。 我的代码有什么问题吗?我想也许是我误用的原型?

编辑: 在MicroMessageDecoder类中,我调试了dstBytes变量并得到了以下信息:

P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477591 0
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 1
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 2
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 3
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 4
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 5
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 6
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 7
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 8
P�*com.tw.components.micro.codec.MicroMessage
!message timestamp 1591783477598 9

所有消息都在这里,所以也许解码器工作错误?

您应该检查writeAndFlush返回的ChannelFuture以了解写入是否失败。

为此,请向其添加ChannelFutureListener

channel.writeAndFlush(msg).addListener(new ChannelFutureListener() {

@Override
public void operationComplete(ChannelFuture future) {
if (future.isSuccess()) {
...
} else {
Throwable cause = future.cause();
...
}
}
});

终于解决了这个问题。由于客户端一次发送的 10 条消息被发布到服务器端,因此我们需要添加 LengthFieldBasedFrameDecoder 来拆分消息。

public class MicroLengthDecoder extends LengthFieldBasedFrameDecoder {
public MicroLengthDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}
}

然后添加如下:

.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
//拆分消息,头部字节长度为4字节
channel.pipeline().addLast("nettyLengthDecoder", new MicroLengthDecoder(1024 * 1024, 0, 4));
channel.pipeline().addLast("nettyMessageEncoder", new MicroMessageEncoder());
channel.pipeline().addLast("nettyMessageDecoder", new MicroMessageDecoder());
....
}
});

需要更改以下两个类:

public class MicroMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> obj) throws Exception {
try {
byte[] originBytes = new byte[in.readableBytes()];
in.readBytes(originBytes, 0, in.readableBytes());
//去掉头部的4字节
byte[] dstBytes = new byte[originBytes.length-4];
System.arraycopy(originBytes,4,dstBytes,0,dstBytes.length);
MicroMessage microMessage = MicroSerializeUtil.deserialize(dstBytes, MicroMessage.class);
obj.add(microMessage);
} catch (Exception e) {
System.out.println("exception when decoding: " + e);
}
}
}

public class MicroMessageEncoder extends MessageToByteEncoder<MicroMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, MicroMessage msg, ByteBuf out) throws Exception {
byte[] data = MicroSerializeUtil.serialize(msg);
ByteBuf buf = Unpooled.copiedBuffer(intToBytes(data.length), data);
out.writeBytes(buf);
}
/**
* 在消息体头部附带4字节,主要是为了拆分消息用
* @param num
* @return
*/
public byte[] intToBytes(int num) {
byte[] bytes = new byte[4];
for (int i = 0; i < 4; i++) {
bytes[i] = (byte) (num >> (24 - i * 8));
}
return bytes;
}
}

最新更新