如何将响应与Netty中的请求关联起来



我有一个异步方法在我的客户端发送一个udp请求到服务器,并返回一个承诺。我需要以某种方式将此Promise传递给我的一个入站处理程序,以便它能够将其标记为"完成",然后通过Promise. setsuccess (result)发送回响应。

你到底是怎么做的?一旦入站处理程序接收到响应,如何将请求实例与来自处理程序的响应关联起来?

这个网站上建议的一些方法对我来说也不起作用:

  • 是否有一种方法可以从channel.write()中返回自定义承诺?

  • 如何使用netty客户端获取服务器响应

我的代码:

客户:

private final EventLoopGroup group;
private final Bootstrap bootstrap;
private Channel channel;
private BlockingQueue<GameQuery> requestQueue;
public SourceServerQueryClient() {
    group = new NioEventLoopGroup();
    bootstrap = new Bootstrap();
    configureBootstrap(bootstrap);
    try {
        channel = bootstrap.bind(0).sync().channel();
    } catch (InterruptedException e) {
        log.error("InterruptedException", e);
    }
}
public void configureBootstrap(Bootstrap bootstrap) {
    //Contains our request queue
    requestQueue = new ArrayBlockingQueue<>(50);
    //Configure our bootstrap
    bootstrap.group(group).channel(NioDatagramChannel.class)
            .handler(new ChannelInitializer<NioDatagramChannel>() {
                @Override
                protected void initChannel(NioDatagramChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast(new ErrorHandler());
                    pipeline.addLast(new SourcePacketVerifierHandler());
                    pipeline.addLast(new SourceQueryEncoder());
                    pipeline.addLast(new MasterServerDecoder());
                    pipeline.addLast(new SourceServerInfoDecoder());
                    pipeline.addLast(new QueryResponseHandler(requestQueue));
                }
            });
}
public Promise<SourceServer> getServerDetails(InetSocketAddress address, QueryCallback<SourceServer> callback) {
    Promise<SourceServer> p = sendQuery(new ServerInfoQuery(address));
    p.addListener(future -> {
        if (future.isSuccess())
            callback.onReceive(p.get());
    });
    return p;
}
private Promise sendQuery(GameQuery query) {
    Promise promise = channel.eventLoop().newPromise();
    query.setPromise(promise);
    ChannelFuture f = null;
    try {
        requestQueue.put(query);
        f = channel.writeAndFlush(query).sync();
    } catch (InterruptedException e) {
        if (f != null)
            promise.setFailure(f.cause());
    }
    return promise;
}
public static void main(String[] args) throws InterruptedException {
    try (SourceServerQueryClient client = new SourceServerQueryClient()) {
        Promise query1 = client.getServerDetails(new InetSocketAddress("169.38.68.44", 27015), msg -> log.info("REPLY FROM SERVER: {}, EXPECTED: 169.38.68.44:27015", msg.toString()));
        Promise query2 = client.getServerDetails(new InetSocketAddress("112.211.234.23", 27016), msg -> log.info("REPLY FROM SERVER: {}, EXPECTED: 112.211.234.23:27016", msg.toString()));
        query2.awaitUninterruptibly();
        log.info("Done");
    } catch (IOException e) {
        log.error(e.getMessage(), e);
    }
}
入站处理程序:

public class QueryResponseHandler<T extends GameQuery> extends SimpleChannelInboundHandler<Object> {
    private static final Logger log = LoggerFactory.getLogger(QueryResponseHandler.class);
    private BlockingQueue<T> requestQueue;
    public QueryResponseHandler(BlockingQueue<T> requestQueue) {
        this.requestQueue = requestQueue;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.debug("From QueryResponseHandler: {}", msg);
        T request = requestQueue.poll(); 
        Promise p = request.getPromise();
        if (request != null) {
            p.setSuccess(msg);
        } else
            p.setFailure(new BufferUnderflowException());
    }
}
在我的测试中,我同时运行两个请求。第一个不应该工作,因为它是一个死服务器。第二个调用应该返回一个响应。输出:

REPLY FROM SERVER: 112.211.234.23:27016, EXPECTED: 169.38.68.44:27015

正如你所看到的,由于设计的原因,它没有像预期的那样工作。第一个查询接收到用于第二个查询的响应。

我已经想不出如何正确设计这个了,所以任何输入都会非常感激!谢谢。

也许添加请求的"id",以便当您"轮询"它时,您确实可以获得正确的请求(因此通过某种队列映射而不仅仅是单个队列)?

这个"id"可以基于您显示的输出消息(也可能不是?)。

WDYT吗?

由于UDP是无状态的,您需要提供一个消息ID以便能够关联和跟踪请求的答案。

如果你想要有状态通信,为什么不直接使用TCP呢?

最新更新