我尝试通过简单地实现以下示例来学习Netty 4.x。我有服务器和客户端。在某个时间点,客户端想知道当前日期时间,他会问服务器"现在几点了?当服务器意识到问题时,他将使用当前日期时间进行回复。
我的实现如下
TimeClientInboundHandler.java
public class TimeClientInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
try {
long currentTimeMillis = (byteBuf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
byteBuf.release();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.write("What time is it?");
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
TimeServerInboundHandler.java
public class TimeServerInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf = (ByteBuf) msg;
try {
byteBuf.readCharSequence(1024, StandardCharsets.UTF_8);
System.out.println(byteBuf.toString());
} finally {
((ByteBuf) msg).release();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final ByteBuf byteBuf = ctx.alloc().buffer(4);
byteBuf.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(byteBuf);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
assert f == future;
ctx.close();
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
但是,我没有得到预期的结果。具体来说,在服务器端,问题"现在几点了?"还没有在控制台上打印出来。
我做错了什么?
原因
此问题有两个原因。
服务器端无法实现从客户端发送的消息,因为消息是以错误的方式写入的(从客户端(。与其ctx.write("What time is it?");
,不如
ctx.write(Unpooled.coppiedBuffer("What time is it?", StandardCharsets.UTF_8);
此外,在TimeServerInboundHandler.java
中,我们应该删除方法channelActive()
。这是因为channelRead()
永远不会被触发,但当然channelActive()
通道处于活动状态时触发。
溶液
TimeClientInboundHandler.java
public class TimeClientInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
try {
System.out.println("Client received: " + byteBuf.toString(StandardCharsets.UTF_8));
ctx.close();
} finally {
byteBuf.release();
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("What time is it?", StandardCharsets.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
TimeServerInboundHandler.java
public class TimeServerInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
try {
System.out.println("Server received: " + byteBuf.toString(StandardCharsets.UTF_8));
String answer = Utils.getCurrentTime();
ctx.writeAndFlush(Unpooled.copiedBuffer(answer, StandardCharsets.UTF_8));
} finally {
((ByteBuf) msg).release();
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Utils.java
public class Utils {
public static String getCurrentTime() {
long currentTimeMillis = ((System.currentTimeMillis() / 1000L + 2208988800L) - 2208988800L) * 1000L;
return new Date(currentTimeMillis).toString();
}
}