出站适配器尝试在客户端已断开连接后发送消息



我使用Spring Integration处理数据,然后返回响应。我用几个通道设置了入站适配器和出站适配器,以处理初始ACK、处理数据并向客户端返回响应。

当客户端断开连接时,我们会触发一个额外的进程,该进程会运行他们在数据库中拥有的所有数据。

问题是,当客户端发送数据,我们开始处理它,然后客户端断开连接时,出站适配器(?(会尝试将该进程的响应发送回客户端。这会导致一个异常,并中断我们在断开连接后启动的附加进程的流程。

在我的本地机器上运行它运行得很好。当将其部署到运行Corretto11Amazon Linux 2实例上的AWS Elastic Beanstalk时,它将失败,并显示以下错误。

我不确定是否有办法在尝试发送之前检查客户端是否仍然连接。我尝试使用错误通道,但它没有捕捉到它。我是Spring Integration的新手,所以非常感谢任何帮助。

谢谢。

错误日志(用XXXX替换客户端IP(:

Nov 17 16:19:38 ip-172-31-40-195 web: 2020-11-17 16:19:38.985  INFO 31923 --- [pool-4-thread-2] c.o.v.service.DefaultVoiceDataProcessor  : Client XXXXXXX disconnected.
Nov 17 16:19:38 ip-172-31-40-195 web: 2020-11-17 16:19:38.986  WARN 31923 --- [pool-4-thread-2] o.s.i.i.tcp.connection.TcpNetConnection  : Failed to publish TcpConnectionCloseEvent [source=TcpNetConnection:XXXXXX], [factory=serverConnectionFactory, connectionId=XXXXXX] **CLOSED**:null
Nov 17 16:19:39 ip-172-31-40-195 web: 2020-11-17 16:19:39.001 ERROR 31923 --- [pool-4-thread-2] o.s.i.ip.tcp.TcpSendingMessageHandler    : Error sending message
Nov 17 16:19:39 ip-172-31-40-195 web: org.springframework.messaging.MessagingException: Send Failed; nested exception is java.net.SocketException: Connection reset by peer (Write failed)
Nov 17 16:19:39 ip-172-31-40-195 web: at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:118) ~[spring-integration-ip-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
Nov 17 16:19:39 ip-172-31-40-195 web: at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageAsServer(TcpSendingMessageHandler.java:119) ~[spring-integration-ip-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
Nov 17 16:19:39 ip-172-31-40-195 web: at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:103) ~[spring-integration-ip-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
Nov 17 16:19:39 ip-172-31-40-195 web: at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
Nov 17 16:19:39 ip-172-31-40-195 web: at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:58) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]

适配器配置设置:

@Bean
fun serverConnectionFactory(byteArrayCrlfSerializer: ByteArrayCrLfSerializer): AbstractServerConnectionFactory =
TcpNetServerConnectionFactory(port).apply {
deserializer = byteArrayCrlfSerializer
soSendBufferSize = MAX_MESSAGE_SIZE + CLRF_SIZE
soReceiveBufferSize = MAX_MESSAGE_SIZE + CLRF_SIZE
}
@Bean
fun inboundAdapter(serverConnectionFactory: AbstractServerConnectionFactory) =
TcpReceivingChannelAdapter().apply {
setConnectionFactory(serverConnectionFactory)
setOutputChannelName("sendAcknowledgement")
}
@Bean
@ServiceActivator(inputChannel = "sendResponse")
fun outboundAdapter(serverConnectionFactory: AbstractServerConnectionFactory) =
TcpSendingMessageHandler().apply {
setConnectionFactory(serverConnectionFactory)
}

@MessagingGateway(defaultRequestChannel = "sendResponse", errorChannel = "outboundErrorChannel")
interface OutboundMessageGateway {
@Throws(MessagingException::class)
fun send(@Payload message: String, @Header(IpHeaders.CONNECTION_ID) connectionId: String)
}

TCP消息端点:

@ServiceActivator(inputChannel = "sendAcknowledgement", outputChannel = "doProcessing")
fun initialAck(message: Message<ByteArray>): Message<ByteArray> {
gateway.send("ACK", message.headers[IpHeaders.CONNECTION_ID].toString())
return message
}
@ServiceActivator(inputChannel = "doProcessing", outputChannel = "sendResponse")
fun process(message: Message<ByteArray>, @Header(IpHeaders.CONNECTION_ID) connectionId: String): String? {
/*** our process logic here ***/
}
@ServiceActivator(inputChannel = "outboundErrorChannel")
fun handleError(message: MessagingException) {
log.error("error: %s", message)
}
override fun onApplicationEvent(event: TcpConnectionEvent) {
when (event) {
is TcpConnectionOpenEvent -> {
log.debug { "${event.connectionId} is attempting to connect" }
}
is TcpConnectionCloseEvent -> {
/*** kick off additional process here ***/
}
}
}

您的TcpSendingMessageHandler可以使用Advice:配置的@ServiceActivator

/**
* Specify a "chain" of {@code Advice} beans that will "wrap" the message handler.
* Only the handler is advised, not the downstream flow.
* @return the advice chain.
*/
String[] adviceChain() default { };

您可以在其中选择一种开箱即用的实现:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-操作员建议链

但可能对您来说最好的是ExpressionEvaluatingRequestHandlerAdvice,因此您将能够捕获异常并对其进行处理,而不会破坏之前的整个流程。