我想在Netty nio中创建一个包含两个客户端和一个服务器的通信系统。更具体地说,首先,我希望当两个客户端与服务器连接时,能够从服务器发送消息,然后能够在两个客户端之间交换数据。我使用的是本例中提供的代码。我在代码中的修改可以在这里找到:链接
服务器处理程序中的channelRead似乎在连接第一个客户端时工作,因此它总是返回1,但在连接第二个客户端时不会更改为2。当两个客户端都连接到服务器时,我如何从服务器正确检查?如何从客户端的主要功能中动态读取此值?那么,让两个客户端进行通信的最佳方式是什么?
EDIT1:显然,客户端服务似乎正在运行并直接关闭,所以每次我运行时,都会连接一个新的NettyClient,但之后连接会关闭。所以计数器总是从零到一。正如下面的评论中所建议的那样,我在同一个端口中使用telnet进行了测试,计数器似乎正常增加,但NettyClient服务编号为
EDIT2:我收到的问题似乎来自future.addListener(ChannelFutureListener.CLOSE);
,它位于ProcessingHandler class
中的channelRead
中。当我把它评论出来时,代码似乎是有效的。然而,我不确定评论出来会有什么后果。此外,我想从我的主要功能客户端检查返回消息的具体时间是两个。我如何创建一个方法来等待来自服务器的特定消息,同时阻止主要功能。
static EventLoopGroup workerGroup = new NioEventLoopGroup();
static Promise<Object> promise = workerGroup.next().newPromise();
public static void callClient() throws Exception {
String host = "localhost";
int port = 8080;
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise));
}
});
ChannelFuture f = b.connect(host, port).sync();
} finally {
//workerGroup.shutdownGracefully();
}
}
我希望在主函数内部调用方法并返回结果,当它为2时,继续执行主函数。但是,我不能在while内部调用callClient,因为它将在同一个客户端上运行多次。
callBack();
while (true) {
Object msg = promise.get();
System.out.println("Case1: the connected clients is not two");
int ret = Integer.parseInt(msg.toString());
if (ret == 2){
break;
}
}
System.out.println("Case2: the connected clients is two");
// proceed with the main functionality
如何更新第一个客户端的promise变量。当我运行两个客户端时,对于第一个客户端,我总是收到消息:
Case1:连接的客户端不是两个
似乎承诺没有正常更新,而对于第二个客户端,我总是收到:
Case2:连接的客户端是两个
如果我的内存正确,ChannelHandlerContext是每个通道一个,并且它的管道中可以有多个ChannelHandlers。通道变量是处理程序类的实例变量。然后为每个连接创建一个新的ProcessingHandler实例。因此,一旦初始化,每个channels
变量中都将有一个且只有一个连接,即为其创建的连接。
请参阅服务器代码(NettyServer.java)中initChannel函数中的new ProcessingHandler()
您可以将channels
变量设置为静态,以便在ProcessingHandler实例之间共享。或者,您可以在其他地方创建一个ProcessingHandler实例(例如,作为run()
函数中的局部变量),然后将该实例传递给addLast
调用,而不是new ProcessingHandler()
。
为什么ChannelGroup通道的大小总是一个。即使我连接更多的客户?
因为每个新的Channel
(客户端)都会调用子ChannelInitializer
。您正在创建ProcessingHandler
的新实例,因此每个通道都可以看到自己的ChannelGroup
实例。
解决方案1-通道属性
使用Attribute并将其与Channel
关联。
在某个地方创建属性(比如在Constants
类内部):
public static final AttributeKey<ChannelGroup> CH_GRP_ATTR =
AttributeKey.valueOf(SomeClass.class.getName());
现在,创建将由ProcessingHandler
:的所有实例使用的ChannelGroup
final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
在NettyServer:中更新您的孩子ChannelInitializer
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new RequestDecoder(),
new ResponseDataEncoder(),
new ProcessingHandler());
ch.attr(Constants.CH_GRP_ATTR).set(channels);
}
现在您可以在处理程序中访问ChannelGroup的实例,如下所示:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final ChannelGroup channels = ctx.channel().attr(Constants.CH_GRP_ATTR).get();
channels.add(ctx.channel());
这将起作用,因为每次新客户端连接时,都会使用对ChannelGroup
的相同引用来调用ChannelInitializer。
解决方案2-静态场
如果将ChannelGroup
声明为静态,则所有类实例都将看到相同的ChannelGroup
实例:
private static final ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
解决方案3-传播共享实例
在ProcessingHandler
:的构造函数中引入参数
private final ChannelGroup channels;
public ProcessingHandler(ChannelGroup chg) {
this.channels = chg;
}
现在,在NettyServer类中创建ChannelGroup
的实例,并将其传播到ProcessingHandler构造函数:
final ChannelGroup channels = new
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new RequestDecoder(),
new ResponseDataEncoder(),
new ProcessingHandler(channels)); // <- here
}
就我个人而言,我会选择第一个解决方案,因为
- 它清楚地将ChannelGroup与Channel上下文关联起来
- 您可以在其他处理程序中访问相同的ChannelGroup
- 您可以有多个服务器实例(在同一JVM中的不同端口上运行)