Vertx NetServer 控制读取流



我正在尝试模仿TCP服务器,基于我必须使用的现有基础设施使用Vertx进行测试。

我正在模仿的服务器完全异步工作,并且根据缓冲区中指示请求长度的预标头知道传入缓冲区的长度。

我需要在连接到模拟 TCP 服务器的每个客户端套接字上读取传入请求的前 6 个字符。 从这个预标头中,我读取了请求的实际长度(例如,对于 xx3018,我知道请求的完整长度是 3018(。

然后,我需要根据长度读取缓冲区的其余部分,将其与响应映射匹配,并为请求返回正确的响应。

使用纯java的工作模拟服务器的示例(快速实现,因此其他开发不会被阻止:)(

public void run(String... args) throws Exception {
log.info("Starting TCP Server");
ServerSocket serverSocket = new ServerSocket(1750);
while (true) {
try {
Socket socket = serverSocket.accept();
CompletableFuture.runAsync(() -> {
Exception e = null;
while (e == null) {
try {
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
byte[] preHeader = new byte[6];
inputStream.read(preHeader);
String preHeaderValue = new String(preHeader);
log.info("Pre header: {}", preHeaderValue);
int length = Integer.valueOf(preHeaderValue.substring(2));
log.info("Request full length: {}", length);
byte[] request = new byte[length - 6];
inputStream.read(request);
String requestValue = new String(request);
log.info("Request: {}", requestValue);
String response = this.requestResponseProvider.getResponse(preHeaderValue + requestValue);
log.info("Response: {}", response);
outputStream.write(response.getBytes());
} catch (Exception ex) {
log.error("Encountered a problem: {}", e.getMessage());
e = ex;
}
}
});
} catch (Exception e) {
log.error("Encountered a problem: {}", e.getMessage());
}
}
}

我似乎找不到一种方法来控制输入流,就像我用普通 java 控制它一样。

在将这个问题放在一边很长时间之后,我决定玩一玩。

我记得在另一个项目中使用以下模块:https://github.com/vert-x3/vertx-tcp-eventbus-bridge

我还记得在 tcp 桥的内部协议中,它将有效负载的长度附加到通过 tcp 桥发送的缓冲区,我查看了源代码以了解它如何处理块(又名帧(

我发现了以下内容:https://github.com/vert-x3/vertx-tcp-eventbus-bridge/blob/master/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java 它完全符合我想要实现的目标:)

我稍微修改了一下,转换为 Kotlin,并制作了它,以便我可以控制标头大小和它提取有效载荷长度的方式。

以下是使用 Vert.x NetServer 控制读取流的粗略快速和脏示例:

suspend fun main() {
val vertx = Vertx.vertx()
initServer(vertx)
initClient(vertx)
}
suspend fun initServer(vertx: Vertx) {
val server = vertx.createNetServer(netServerOptionsOf(port = 8888, host = "localhost"))
server
.connectHandler { socket ->
val parser = FrameParser(
headerSize = 4,
headerHandler = {
it.getInt(0)
},
handler = {
println(it.toString())
println("---")
}
)
socket.handler(parser)
socket.exceptionHandler {
it.printStackTrace()
socket.close()
}
}
.listenAwait()
}
suspend fun initClient(vertx: Vertx) {
val client = vertx.createNetClient()
val socket = client.connectAwait(port = 8888, host = "localhost")
val message = "START|${"foobarfoobar".repeat(100)}|END"
val length = message.length
repeat(5) {
repeat(100) {
vertx.setPeriodic(10) {
socket.write(
Buffer.buffer()
.appendInt(length)
.appendString(message)
)
}
}
delay(1000)
}
}
/**
* Based on: https://github.com/vert-x3/vertx-tcp-eventbus-bridge/blob/master/src/main/java/io/vertx/ext/eventbus/bridge/tcp/impl/protocol/FrameParser.java
*/
class FrameParser(
private val headerSize: Int,
private val headerHandler: (Buffer) -> Int,
private val handler: (Buffer) -> Unit
) : Handler<Buffer?> {
private var _buffer: Buffer? = null
private var _offset = 0
override fun handle(buffer: Buffer?) {
append(buffer)
var offset: Int
while (true) {
// set a rewind point. if a failure occurs,
// wait for the next handle()/append() and try again
offset = _offset
// how many bytes are in the buffer
val remainingBytes = bytesRemaining()
// at least expected header size
if (remainingBytes < headerSize) {
break
}
// what is the length of the message
val length: Int = headerHandler(_buffer!!.getBuffer(_offset, _offset + headerSize))
_offset += headerSize
if (remainingBytes - headerSize >= length) {
// we have a complete message
handler(_buffer!!.getBuffer(_offset, _offset + length))
_offset += length
} else {
// not enough data: rewind, and wait
// for the next packet to appear
_offset = offset
break
}
}
}
private fun append(newBuffer: Buffer?) {
if (newBuffer == null) {
return
}
// first run
if (_buffer == null) {
_buffer = newBuffer
return
}
// out of data
if (_offset >= _buffer!!.length()) {
_buffer = newBuffer
_offset = 0
return
}
// very large packet
if (_offset > 0) {
_buffer = _buffer!!.getBuffer(_offset, _buffer!!.length())
}
_buffer!!.appendBuffer(newBuffer)
_offset = 0
}
private fun bytesRemaining(): Int {
return if (_buffer!!.length() - _offset < 0) {
0
} else {
_buffer!!.length() - _offset
}
}
}

最新更新