带有ServiceActivator和JdbcMessageHandler的RetryTemplate



我正在使用spring集成将数据从源拉入数据库。

JdbcMessageHandler用于处理数据库插入,

是否有可能将ServiceActivator附加到RetryTemplate(与ExponentialBackOffPolicy),以便重试,如果有db连接相关的错误?

尝试这个

@ServiceActivator(inputChannel = "DatabaseOutput", adviceChain = {"retryAdvice"})
public MessageHandler jdbcMessageHandler() {
JdbcMessageHandler jdbcMessageHandler = new JdbcMessageHandler(getDataSource(),...);
}


@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
...
return retryTemplate;
}
@Bean
public RequestHandlerRetryAdvice retryAdvice(RetryTemplate retryTemplate) {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
advice.setRetryTemplate(retryTemplate);
return advice;
}

当我启动应用程序时,我给了它错误的主机,并期望它尝试n次,其中n是我在RetryTemplate上设置的retryPolicy.setMaxAttempts(99)

但是它立即失败了

部分堆栈跟踪:

org.postgresql.util.PSQLException: Connection to localhost:5431 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:319)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)
at org.postgresql.Driver.makeConnection(Driver.java:402)
at org.postgresql.Driver.connect(Driver.java:261)
at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:121)
at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:364)
at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206)
at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:476)
at com.zaxxer.hikari.pool.HikariPool.checkFailFast(HikariPool.java:561)
at com.zaxxer.hikari.pool.HikariPool.<init>(HikariPool.java:115)
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:112)
at org.springframework.jdbc.datasource.DataSourceUtils.fetchConnection(DataSourceUtils.java:159)
at org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:117)
at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:80)
at org.springframework.jdbc.support.JdbcUtils.extractDatabaseMetaData(JdbcUtils.java:337)
at org.springframework.boot.jdbc.DatabaseDriver.fromDataSource(DatabaseDriver.java:329)

在解决了这个问题后,它一直在不停地尝试连接。

org.postgresql.util.PSQLException: Connection to localhost:5431 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:319)
at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)
at org.postgresql.Driver.makeConnection(Driver.java:402)
at org.postgresql.Driver.connect(Driver.java:261)
at com.zaxxer.hikari.util.DriverDataSource.getConnection(DriverDataSource.java:121)
at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:364)
at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206)
at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:476)
at com.zaxxer.hikari.pool.HikariPool.checkFailFast(HikariPool.java:561)
at com.zaxxer.hikari.pool.HikariPool.<init>(HikariPool.java:115)
at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:112)
at org.springframework.jdbc.datasource.DataSourceUtils.fetchConnection(DataSourceUtils.java:159)
at org.springframework.jdbc.datasource.DataSourceUtils.doGetConnection(DataSourceUtils.java:117)
at org.springframework.jdbc.datasource.DataSourceUtils.getConnection(DataSourceUtils.java:80)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:646)
at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:691)
at org.springframework.jdbc.core.JdbcTemplate.batchUpdate(JdbcTemplate.java:1034)
at org.springframework.integration.jdbc.JdbcMessageHandler.executeUpdateQuery(JdbcMessageHandler.java:233)
at org.springframework.integration.jdbc.JdbcMessageHandler.handleMessageInternal(JdbcMessageHandler.java:175)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:59)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler$AdvisedRequestHandler.handleRequestMessage(AbstractReplyProducingMessageHandler.java:208)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice$CallbackImpl.cloneAndExecute(AbstractRequestHandlerAdvice.java:166)
at org.springframework.integration.handler.advice.RequestHandlerRetryAdvice.lambda$doInvoke$1(RequestHandlerRetryAdvice.java:86)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
at org.springframework.integration.handler.advice.RequestHandlerRetryAdvice.doInvoke(RequestHandlerRetryAdvice.java:86)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:67)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy190.handleRequestMessage(Unknown Source)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.doInvokeAdvisedRequestHandler(AbstractReplyProducingMessageHandler.java:155)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:139)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:457)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:325)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:268)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:232)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:222)
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:178)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:216)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:397)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:83)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:550)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationBatchMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:518)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2274)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2264)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessage(KafkaMessageListenerContainer.java:2207)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:2000)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1979)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1366)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1357)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1252)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:399)
at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:242)
at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:224)
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.base/java.net.Socket.connect(Socket.java:609)
at org.postgresql.core.PGStream.createSocket(PGStream.java:241)
at org.postgresql.core.PGStream.<init>(PGStream.java:98)
at org.postgresql.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:109)
at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:235)
... 84 common frames omitted

at org.springframework.boot.jdbc.DatabaseDriver.fromDataSource(DatabaseDriver.java:329)

这与你的JdbcMessageHandler无关。它在您尝试发送消息之前就失败了:在启动时,Spring Boot试图通过脚本初始化数据库。目前你只是不告诉Spring Boot你使用PostgreSQL,所以它试图通过DataSource来解决它。即使你说它仍然会失败,因为Spring Boot不能用相应的脚本初始化DB。

请看看是否可以模拟一些其他错误,让应用程序传递给消息交换器。

最新更新