Spring Integration TCP Inbound Adapter客户端断开连接问题



我们已经为入站TCP服务器创建了一个IntegrationFlow。每当我们对此进行测试时,集成都很好,直到客户端断开连接,然后我们就会得到连续的空消息流。

弹簧集成5.2.5.RELEASE

流量:

@Bean
public IntegrationFlow inboundFlow(MyService myService) {
return IntegrationFlows.from(
Tcp.inboundAdapter(
Tcp.netServer(12345)
.deserializer(TcpCodecs.lengthHeader4())))
.transform(Transformers.objectToString())
.log(INFO) // Logging only for testing 
.handle(myService, "process") // Do some processing of the message
.get();
}

测试方法:

public void sendPackets() throws Exception {
List<Path> files = getAllFilesSorted("/some/location");
try (Socket socket = new Socket("localhost", 12345)) {
log.info("local port: {}", socket.getLocalPort());
try (OutputStream outputStream = socket.getOutputStream()) {
for (int i = 0; i < 10; i++) {
Path path = files.get(i);
log.info("{} ({} of {})", path.toString(), i, files.size());
byte[] bytes = Files.readAllBytes(path);
outputStream.write(bytes);
outputStream.flush();
Thread.sleep(200);
}
}
}
}

测试完成后的部分日志:

2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=813ccf6b-9eb7-2ff3-0f8d-f2810966d8d7, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=1fd5c4c7-b2e0-6a89-7de6-894c8e42e242, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=78b852e8-0094-f8b9-8d05-37bc4912d096, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=4e4a7ee0-c66e-9501-9ef7-4d25143531b3, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=c90abe17-7036-94cc-af59-3b0cac5e75e4, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.963  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=425d2e2a-75c5-0184-06da-6d27eb546931, ip_hostname=localhost, timestamp=1588672852963}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=4c55d1d0-7421-1832-23ca-03744419c847, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=05b6fea6-8f76-5c74-e4cc-33bd8099d8e8, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=9b8fe16c-07f4-b64b-faec-794eb743559c, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=222725e8-19c6-62b6-8ba7-103892191c96, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=951403b6-18a9-ffe9-29b7-67b8c01ea311, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=2ac3002b-ccd1-9506-f3aa-60f4b63e26b8, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=dbbf12cf-0a27-407b-0a96-5b27ab0539db, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.965  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=3282d09e-9508-b053-3763-cc23d3c817a8, ip_hostname=localhost, timestamp=1588672852965}]
2020-05-05 11:00:52.966  INFO 31793 --- [pool-1-thread-2] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=, headers={ip_tcp_remotePort=59407, ip_connectionId=localhost:59407:44745:df23388b-b8b0-4250-b926-4199030b2ff6, ip_localInetAddress=/127.0.0.1, ip_address=127.0.0.1, id=549b2326-341f-19c0-db98-6b31da1901d8, ip_hostname=localhost, timestamp=1588672852966}]

日志中没有任何关于客户端断开连接的内容。我们会在日志中看到套接字关闭活动。

日志表明我们正在接收包含0x00000000的数据包。

使用wireshark或类似工具查看电线上的活动。

编辑

您的自定义反序列化程序导致了此问题。

if (read < 0) {
return 0;
}

当检测到套接字关闭时,需要抛出一个SoftEndOfStreamException

文档中对此进行了解释。

当反序列化程序检测到消息之间有一个关闭的输入流时,它必须抛出一个SoftEndOfStreamException;这是对框架的一个信号,表明收盘是"正常的"。如果在解码消息时关闭了流,则应该引发其他异常。

最新更新