我有一个异步方法在我的客户端发送一个udp请求到服务器,并返回一个承诺。我需要以某种方式将此Promise传递给我的一个入站处理程序,以便它能够将其标记为"完成",然后通过Promise. setsuccess (result)发送回响应。
你到底是怎么做的?一旦入站处理程序接收到响应,如何将请求实例与来自处理程序的响应关联起来?
这个网站上建议的一些方法对我来说也不起作用:
-
是否有一种方法可以从channel.write()中返回自定义承诺?
-
如何使用netty客户端获取服务器响应
客户:
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private Channel channel;
private BlockingQueue<GameQuery> requestQueue;
public SourceServerQueryClient() {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
configureBootstrap(bootstrap);
try {
channel = bootstrap.bind(0).sync().channel();
} catch (InterruptedException e) {
log.error("InterruptedException", e);
}
}
public void configureBootstrap(Bootstrap bootstrap) {
//Contains our request queue
requestQueue = new ArrayBlockingQueue<>(50);
//Configure our bootstrap
bootstrap.group(group).channel(NioDatagramChannel.class)
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new ErrorHandler());
pipeline.addLast(new SourcePacketVerifierHandler());
pipeline.addLast(new SourceQueryEncoder());
pipeline.addLast(new MasterServerDecoder());
pipeline.addLast(new SourceServerInfoDecoder());
pipeline.addLast(new QueryResponseHandler(requestQueue));
}
});
}
public Promise<SourceServer> getServerDetails(InetSocketAddress address, QueryCallback<SourceServer> callback) {
Promise<SourceServer> p = sendQuery(new ServerInfoQuery(address));
p.addListener(future -> {
if (future.isSuccess())
callback.onReceive(p.get());
});
return p;
}
private Promise sendQuery(GameQuery query) {
Promise promise = channel.eventLoop().newPromise();
query.setPromise(promise);
ChannelFuture f = null;
try {
requestQueue.put(query);
f = channel.writeAndFlush(query).sync();
} catch (InterruptedException e) {
if (f != null)
promise.setFailure(f.cause());
}
return promise;
}
public static void main(String[] args) throws InterruptedException {
try (SourceServerQueryClient client = new SourceServerQueryClient()) {
Promise query1 = client.getServerDetails(new InetSocketAddress("169.38.68.44", 27015), msg -> log.info("REPLY FROM SERVER: {}, EXPECTED: 169.38.68.44:27015", msg.toString()));
Promise query2 = client.getServerDetails(new InetSocketAddress("112.211.234.23", 27016), msg -> log.info("REPLY FROM SERVER: {}, EXPECTED: 112.211.234.23:27016", msg.toString()));
query2.awaitUninterruptibly();
log.info("Done");
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
入站处理程序:public class QueryResponseHandler<T extends GameQuery> extends SimpleChannelInboundHandler<Object> {
private static final Logger log = LoggerFactory.getLogger(QueryResponseHandler.class);
private BlockingQueue<T> requestQueue;
public QueryResponseHandler(BlockingQueue<T> requestQueue) {
this.requestQueue = requestQueue;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("From QueryResponseHandler: {}", msg);
T request = requestQueue.poll();
Promise p = request.getPromise();
if (request != null) {
p.setSuccess(msg);
} else
p.setFailure(new BufferUnderflowException());
}
}
在我的测试中,我同时运行两个请求。第一个不应该工作,因为它是一个死服务器。第二个调用应该返回一个响应。输出:REPLY FROM SERVER: 112.211.234.23:27016, EXPECTED: 169.38.68.44:27015
正如你所看到的,由于设计的原因,它没有像预期的那样工作。第一个查询接收到用于第二个查询的响应。
我已经想不出如何正确设计这个了,所以任何输入都会非常感激!谢谢。
也许添加请求的"id",以便当您"轮询"它时,您确实可以获得正确的请求(因此通过某种队列映射而不仅仅是单个队列)?
这个"id"可以基于您显示的输出消息(也可能不是?)。
WDYT吗?
由于UDP是无状态的,您需要提供一个消息ID以便能够关联和跟踪请求的答案。
如果你想要有状态通信,为什么不直接使用TCP呢?