spring集成amqp出站适配器竞争状况



在我们的一个生产应用程序中,我们有一个相当复杂的spring集成amqp用例,并且我们在启动时看到了一些"org.springframework.integration.MessageDispatchingException:Dispatcher没有订阅者"异常。在启动时出现初始错误之后,我们再也看不到来自相同组件的异常了。对于依赖AMQP出站适配器并最终在生命周期早期使用它们的组件来说,这似乎是某种启动竞争条件。

我可以通过调用网关来重现这一点,该网关将发送到连接到PostConstruct方法中的出站适配器的通道。

配置:

package gadams;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.amqp.Amqp;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.messaging.MessageChannel;
@SpringBootApplication
@IntegrationComponentScan
public class RabbitRace {
public static void main(String[] args) {
SpringApplication.run(RabbitRace.class, args);
}
@Bean(name = "HelloOut")
public MessageChannel channelHelloOut() {
return MessageChannels.direct().get();
}
@Bean
public Queue queueHello() {
return new Queue("hello.q");
}
@Bean(name = "helloOutFlow")
public IntegrationFlow flowHelloOutToRabbit(RabbitTemplate rabbitTemplate) {
return IntegrationFlows.from("HelloOut").handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("hello.q"))
.get();
}
}

网关:

package gadams;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
@MessagingGateway
public interface HelloGateway {
@Gateway(requestChannel = "HelloOut")
void sendMessage(String message);
}

组件:

package gadams;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;
@Component
@DependsOn("helloOutFlow")
public class HelloPublisher {
@Autowired
private HelloGateway helloGateway;
@PostConstruct
public void postConstruct() {
helloGateway.sendMessage("hello");
}
}

在我的生产用例中,我们有一个带有PostConstruct方法的组件,其中我们使用TaskScheduler来调度一堆组件,其中一些组件依赖于AMQP出站适配器,其中一些最终会立即执行。我曾尝试在涉及出站适配器的IntegrationFlow上放置bean名称,并在使用网关和/或网关本身的bean上使用@DependsOn,但这并不能消除启动时的错误。

一切都称为Lifecycle。任何Spring Integration端点只有在执行其start()时才开始侦听或生成消息。

通常,对于标准默认autoStartup = true,它在ApplicationContext.finishRefresh();中作为进行

// Propagate refresh to lifecycle processor first.
getLifecycleProcessor().onRefresh();

@PostConstruct(afterPropertiesSet())开始向信道生成消息实际上非常早,因为它离finishRefresh()很远。

您真的应该在SmartLifecycle.start()阶段重新考虑您的生产逻辑和实现。

请参阅参考手册中的更多信息。