Spring Cloud RabbitMQ如何拦截重新发布到DLQ的消息?



我想拦截在重试限制耗尽后重新发布到DLQ的消息,我的最终目标是从这些消息中消除x-exception-stacktrace标头。

配置:

spring:
application:
name: sandbox
cloud:
function:
definition: rabbitTest1Input
stream:
binders:
rabbitTestBinder1:
type: rabbit
environment:
spring:
rabbitmq:
addresses: localhost:55015
username: guest
password: guest
virtual-host: test

bindings:
rabbitTest1Input-in-0:
binder: rabbitTestBinder1
consumer:
max-attempts: 3
destination: ex1
group: q1
rabbit:
bindings:
rabbitTest1Input-in-0:
consumer:
autoBindDlq: true
bind-queue: true
binding-routing-key: q1key
deadLetterExchange: ex1-DLX
dlqDeadLetterExchange: ex1
dlqDeadLetterRoutingKey: q1key_dlq
dlqTtl: 180000
prefetch: 5
queue-name-group-only: true
republishToDlq: true
requeueRejected: false
ttl: 86400000
@Configuration
class ConsumerConfig {
companion object : KLogging()
@Bean
fun rabbitTest1Input(): Consumer<Message<String>> {
return Consumer {
logger.info("Received from test1 queue: ${it.payload}")
throw AmqpRejectAndDontRequeueException("FAILED")  // force republishing to DLQ after N retries
}
}
}

首先我试图注册@GlobalChannelInterceptor(像这里),但由于RabbitMessageChannelBinder使用自己的私有RabbitTemplate实例(不自动连接)重新发布(见#getErrorMessageHandler)它不会被截获。

然后我试图通过丢弃与x-exception-stacktrace相关的代码来扩展RabbitMessageChannelBinder类,然后将此扩展声明为bean:

/**
* Forked from {@link org.springframework.cloud.stream.binder.rabbit.RabbitMessageChannelBinder} with the goal
* to eliminate {@link RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE} header from messages republished to DLQ
*/
class RabbitMessageChannelBinderWithNoStacktraceRepublished 
: RabbitMessageChannelBinder(...)
// and then
@Configuration
@Import(
RabbitAutoConfiguration::class,
RabbitServiceAutoConfiguration::class,
RabbitMessageChannelBinderConfiguration::class,
PropertyPlaceholderAutoConfiguration::class,
)
@EnableConfigurationProperties(
RabbitProperties::class,
RabbitBinderConfigurationProperties::class,
RabbitExtendedBindingProperties::class
)
class RabbitConfig {
@Bean
@Primary
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@Order(Ordered.HIGHEST_PRECEDENCE)
fun customRabbitMessageChannelBinder(
appCtx: ConfigurableApplicationContext,
... // required injections
): RabbitMessageChannelBinder {
// remove the original (auto-configured) bean. Explanation is after the code snippet
val registry = appCtx.autowireCapableBeanFactory as BeanDefinitionRegistry
registry.removeBeanDefinition("rabbitMessageChannelBinder")
// ... and replace it with custom binder. It's initialized absolutely the same way as original bean, but is of forked class
return RabbitMessageChannelBinderWithNoStacktraceRepublished(...)
}
}

但是在这种情况下,我的通道绑定器不尊重YAML属性(例如addresses: localhost:55015),并使用默认值(例如localhost:5672)

INFO  o.s.a.r.c.CachingConnectionFactory - Attempting to connect to: [localhost:5672]
INFO  o.s.a.r.l.SimpleMessageListenerContainer - Broker not available; cannot force queue declarations during start: java.net.ConnectException: Connection refused

另一方面,如果我不从Spring上下文中删除原始粘合剂,我会得到以下错误:

Caused by: java.lang.IllegalStateException: Multiple binders are available, however neither default nor per-destination binder name is provided. Available binders are [rabbitMessageChannelBinder, customRabbitMessageChannelBinder]
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:145)

谁能给我一个提示如何解决这个问题?

注:我使用Spring Cloud Stream 3.1.6和Spring Boot 2.6.6

  1. 禁用绑定器重试/DLQ配置(maxAttempts=1republishToDlq=false和其他DLQ相关属性)
  2. 添加ListenerContainerCustomizer,在建议链中添加自定义重试建议,并带有自定义死信发布恢复器。
  3. 使用Queue@Bean手动分配DLQ
@SpringBootApplication
public class So72871662Application {
public static void main(String[] args) {
SpringApplication.run(So72871662Application.class, args);
}
@Bean
public Consumer<String> input() {
return str -> {
System.out.println();
throw new RuntimeException("test");
};
}
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer(RetryOperationsInterceptor retry) {
return (cont, dest, grp) -> {
((AbstractMessageListenerContainer) cont).setAdviceChain(retry);
};
}
@Bean
RetryOperationsInterceptor interceptor(MessageRecoverer recoverer) {
return RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(3_000L, 2.0, 10_000L)
.recoverer(recoverer)
.build();
}
@Bean
MessageRecoverer recoverer(RabbitTemplate template) {
return new RepublishMessageRecoverer(template, "DLX", "errors") {
@Override
protected void doSend(@Nullable
String exchange, String routingKey, Message message) {
message.getMessageProperties().getHeaders().remove(RepublishMessageRecoverer.X_EXCEPTION_STACKTRACE);
super.doSend(exchange, routingKey, message);
}
};
}
@Bean
FanoutExchange dlx() {
return new FanoutExchange("DLX");
}
@Bean
Queue dlq() {
return new Queue("errors");
}
@Bean
Binding dlqb() {
return BindingBuilder.bind(dlq()).to(dlx());
}
}

最新更新