使响应的顺序与Netty中请求的顺序相匹配



上下文

我正在编写一个Netty应用程序(Netty 4(,其中处理每条消息可能需要一些时间。作为我的意思的一个例子,我创建了一个EchoHandler,它会用原始消息进行响应,尽管有时会经过短暂的延迟:

public class EchoHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
var message = (String) msg;
if (message.equals("delay")) {
ctx.executor().schedule(() -> ctx.writeAndFlush(message), 400, TimeUnit.MILLISECONDS);
} else {
ctx.writeAndFlush(message);
}
}
}

问题

使用此代码,不能保证响应的顺序与传入请求的顺序相同。事实上,如果第一消息由字符串"0"组成;延迟;和其他字符串的第二个,响应的顺序将颠倒!我写了一个测试来说明这一点:

public class Tests {
@Test
public void test() throws ExecutionException, InterruptedException {
var channel = new EmbeddedChannel(new EchoHandler());
channel.writeInbound("delay", "second message");
// Let some time pass and process any scheduled tasks
Thread.sleep(500);
channel.runPendingTasks();
// The response to the last message comes first!
var firstMessage = (String) channel.readOutbound();
Assertions.assertEquals("second message", firstMessage);
// The response to the first message comes second!
var secondMessage = (String) channel.readOutbound();
Assertions.assertEquals("delay", secondMessage);
}
}

问题

我正在Netty中寻找一种内置的方法,以确保传出响应的顺序与传入消息的顺序匹配。使用库的假想扩展,我将重写处理程序如下:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
var message = (String) msg;
ctx.pauseMessageProcessing(); // Upcoming messages will be temporarily queued

if (message.equals("delay")) {
ctx.executor().schedule(() -> {
ctx.writeAndFlush(message);
ctx.resumeMessageProcessing(); // After flushing we can resume message processing
}, 400, TimeUnit.MILLISECONDS);
} else {
ctx.writeAndFlush(message);
ctx.resumeMessageProcessing(); // After flushing we can resume message processing
}
}

Netty是否提供了开箱即用的东西?这听起来像是一个常见的用例,我更愿意依赖经过战斗测试的代码,而不是编写自己的队列实现。

虽然不是内置的,但这个问题的一个看似惯用的解决方案是自定义MessageToMessageCodec。基于使用String作为消息类型的示例,我编写了以下为您处理排队的编解码器:

public class QueuingCodec extends MessageToMessageCodec<String, String> {
private final Queue<String> messageQueue = new ArrayDeque<>();
private boolean processingMessage = false;
@Override
protected void encode(ChannelHandlerContext ctx, String s, List<Object> list)
throws Exception {
// Pass the message on to output
list.add(s);
// Allow processing more messages
processingMessage = false;
// Send the next message in the queue if available
if (!messageQueue.isEmpty()) {
processingMessage = true;
var message = messageQueue.poll();
// Pass the message on to input
ctx.executor().execute(() -> {
ctx.fireChannelRead(message);
});
}
}
@Override
protected void decode(ChannelHandlerContext ctx, String s, List<Object> list)
throws Exception {
if (processingMessage) {
// Store message for later processing
messageQueue.add(s);
} else {
// Pass data on to input
list.add(s);
// Prevent processing of new messages
processingMessage = true;
}
}
}

最新更新