Netty Nio java中的通信



我想在Netty nio中创建一个包含两个客户端和一个服务器的通信系统。更具体地说,首先,我希望当两个客户端与服务器连接时,能够从服务器发送消息,然后能够在两个客户端之间交换数据。我使用的是本例中提供的代码。我在代码中的修改可以在这里找到:链接

服务器处理程序中的channelRead似乎在连接第一个客户端时工作,因此它总是返回1,但在连接第二个客户端时不会更改为2。当两个客户端都连接到服务器时,我如何从服务器正确检查?如何从客户端的主要功能中动态读取此值?那么,让两个客户端进行通信的最佳方式是什么?

EDIT1:显然,客户端服务似乎正在运行并直接关闭,所以每次我运行时,都会连接一个新的NettyClient,但之后连接会关闭。所以计数器总是从零到一。正如下面的评论中所建议的那样,我在同一个端口中使用telnet进行了测试,计数器似乎正常增加,但NettyClient服务编号为

EDIT2:我收到的问题似乎来自future.addListener(ChannelFutureListener.CLOSE);,它位于ProcessingHandler class中的channelRead中。当我把它评论出来时,代码似乎是有效的。然而,我不确定评论出来会有什么后果。此外,我想从我的主要功能客户端检查返回消息的具体时间是两个。我如何创建一个方法来等待来自服务器的特定消息,同时阻止主要功能。

static EventLoopGroup workerGroup = new NioEventLoopGroup();
static Promise<Object> promise = workerGroup.next().newPromise(); 
public static void callClient() throws Exception {
String host = "localhost";
int port = 8080;
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RequestDataEncoder(), new ResponseDataDecoder(), new ClientHandler(promise));
}
});
ChannelFuture f = b.connect(host, port).sync();
} finally {
//workerGroup.shutdownGracefully();
}
}

我希望在主函数内部调用方法并返回结果,当它为2时,继续执行主函数。但是,我不能在while内部调用callClient,因为它将在同一个客户端上运行多次。

callBack();
while (true) {
Object msg = promise.get();
System.out.println("Case1: the connected clients is not two");
int ret = Integer.parseInt(msg.toString());
if (ret == 2){
break;
}
}
System.out.println("Case2: the connected clients is two");
// proceed with the main functionality

如何更新第一个客户端的promise变量。当我运行两个客户端时,对于第一个客户端,我总是收到消息:

Case1:连接的客户端不是两个

似乎承诺没有正常更新,而对于第二个客户端,我总是收到:

Case2:连接的客户端是两个

如果我的内存正确,ChannelHandlerContext是每个通道一个,并且它的管道中可以有多个ChannelHandlers。通道变量是处理程序类的实例变量。然后为每个连接创建一个新的ProcessingHandler实例。因此,一旦初始化,每个channels变量中都将有一个且只有一个连接,即为其创建的连接。

请参阅服务器代码(NettyServer.java)中initChannel函数中的new ProcessingHandler()

您可以将channels变量设置为静态,以便在ProcessingHandler实例之间共享。或者,您可以在其他地方创建一个ProcessingHandler实例(例如,作为run()函数中的局部变量),然后将该实例传递给addLast调用,而不是new ProcessingHandler()

为什么ChannelGroup通道的大小总是一个。即使我连接更多的客户?

因为每个新的Channel(客户端)都会调用子ChannelInitializer。您正在创建ProcessingHandler的新实例,因此每个通道都可以看到自己的ChannelGroup实例。

解决方案1-通道属性

使用Attribute并将其与Channel关联。

在某个地方创建属性(比如在Constants类内部):

public static final AttributeKey<ChannelGroup> CH_GRP_ATTR = 
AttributeKey.valueOf(SomeClass.class.getName());

现在,创建将由ProcessingHandler:的所有实例使用的ChannelGroup

final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

在NettyServer:中更新您的孩子ChannelInitializer

@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new RequestDecoder(), 
new ResponseDataEncoder(), 
new ProcessingHandler());
ch.attr(Constants.CH_GRP_ATTR).set(channels);
}

现在您可以在处理程序中访问ChannelGroup的实例,如下所示:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final ChannelGroup channels = ctx.channel().attr(Constants.CH_GRP_ATTR).get();
channels.add(ctx.channel());

这将起作用,因为每次新客户端连接时,都会使用对ChannelGroup的相同引用来调用ChannelInitializer。

解决方案2-静态场

如果将ChannelGroup声明为静态,则所有类实例都将看到相同的ChannelGroup实例:

private static final ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

解决方案3-传播共享实例

ProcessingHandler:的构造函数中引入参数

private final ChannelGroup channels;
public ProcessingHandler(ChannelGroup chg) {
this.channels = chg;
}

现在,在NettyServer类中创建ChannelGroup的实例,并将其传播到ProcessingHandler构造函数:

final ChannelGroup channels = new 
DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new RequestDecoder(), 
new ResponseDataEncoder(), 
new ProcessingHandler(channels)); // <- here
}

就我个人而言,我会选择第一个解决方案,因为

  • 它清楚地将ChannelGroup与Channel上下文关联起来
  • 您可以在其他处理程序中访问相同的ChannelGroup
  • 您可以有多个服务器实例(在同一JVM中的不同端口上运行)

最新更新