ActiveMq,springBoot - 立即处理队列中发送的每条消息



我正在使用ActiveMq和SpringBoot,将每条记录从大型csv文件发送到另一个服务。我正在将记录加载到地图中,然后在每个循环中将记录发送到ActiveMq队列。

我的问题是,ActiveMq 不会让任何消费者从队列中获取记录,直到我的地图中的所有记录都被发送到 ActiveMq。

我可以将 ActiveMq 配置为允许在放入队列后立即使用消息(而不是用于某种提交事务(吗?

这是我的ActiveMq配置:

@EnableJms
@Configuration
public class JmsConfig implements JmsListenerConfigurer {
@Autowired
private JmsErrorHandler jmsErrorHandler;
@Autowired
private MessageConverter messageConverter;
@Autowired
private DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory;
@Autowired
private DefaultMessageHandlerMethodFactory handlerMethodFactory;
@Autowired
private JsonMessageConverter jsonMessageConverter;
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String username;
@Value("${spring.activemq.password}")
private String password;
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(username, password, brokerUrl);
    activeMQConnectionFactory.setUseAsyncSend(true);
    return activeMQConnectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory() {
    return new DefaultJmsListenerContainerFactory();
}
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory,
                                                                  DefaultJmsListenerContainerFactoryConfigurer configurer) {
    defaultJmsListenerContainerFactory.setErrorHandler(jmsErrorHandler);
    configurer.configure(defaultJmsListenerContainerFactory, activeMQConnectionFactory);
    return defaultJmsListenerContainerFactory;
}
@Bean
public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
    DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
    factory.setMessageConverter(messageConverter);
    return factory;
}
@Bean
public MessageConverter messageConverter() {
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.setObjectMapper(createJacksonObjectMapper());
    return converter;
}
@NotNull
private ObjectMapper createJacksonObjectMapper() {
    return Jackson2ObjectMapperBuilder
            .json()
            .modules(new JavaTimeModule())
            .build();
}
@Override
public void configureJmsListeners(@NotNull JmsListenerEndpointRegistrar registrar) {
    registrar.setMessageHandlerMethodFactory(handlerMethodFactory);
}
@Bean
public JmsTemplate createJmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory) {
    JmsTemplate jmsTemplate = new JmsTemplate();
    jmsTemplate.setMessageConverter(jsonMessageConverter);
    jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
    jmsTemplate.setDeliveryPersistent(false);
    return jmsTemplate;
}

}

我正在使用以下代码发送消息:

   public void sendRecordToLogbook(Record record) {
    jmsTemplate.convertAndSend(logbookDestination, record);
}

您使用活动 MQ 所做的同样的事情,我正在使用 AWS SQS 做同样的事情。在 SQS 中,队列消息可以在我们写入时同时读取,一旦读取,它将自动从队列中删除。因此,我建议使用 AWS SQS 来实现您的功能。

我通过为每个消息实现自己的连接、会话和生产者而不是使用 JMSTemplate 来做到这一点。

最新更新