Spring JMS 负载均衡,一个生产者和一个 Concumer



我已经研究了很长时间关于JMS负载平衡。我们可以创建多个生产者和多个消费者来对 JMS 消息进行负载均衡。但我想了解,我们如何用一个生产者和一个消费者对 JMS 消息进行负载均衡。我不能像Apache Camel那样在我的项目中添加更多的依赖项。

@Configuration
@EnableJms
@ComponentScan({"com.jmsloadbalance.jms"})
@Bean
public class JmsConfig {
public JmsTemplate getJmsTemplate() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory());
template.setDefaultDestination(new ActiveMQQueue("default.topic");
template.setExplicitQosEnabled(true);
template.setDeliveryPersistent(false);
template.setTimeToLive(60000);
template.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
template.setMessageConverter(getMessageConverter());
return template;
}
@Bean
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setPubSubDomain(false);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setConcurrency("1");
factory.setMessageConverter(getMessageConverter());
return factory;
}
private ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("vm://localhost");
return factory;
}
private MessageConverter getMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTypeIdPropertyName("JMSType");
return converter;
}
}

这是我的 JmsConfig 类,我不能进行大的配置更改,例如引入更多的 JMSTemplate 或更多的 ConnectionFactory。我的制作人如下所示

@Service("accountJmsProducer")
public class AccountJmsProducer {
private static Logger LOG = Logger.getLogger(AccountJmsProducer.class);
@Autowired
private JmsTemplate template;
private Destination destination;
public Account create(Account account) {
if (this.destination == null) {
this.destination = new ActiveMQQueue("account.create");
}
template.convertAndSend(destination, account);
return null;
}
}

我的使用者如下所示:

@Service("accountJmsConsumer")
public class AccountJmsConsumer {
private static final Logger LOG = Logger.getLogger(AccountJmsConsumer.class);
@Autowired
@Qualifier("accountService")
private AccountService accountService;
private Account lastReceived;
@JmsListener(containerFactory = "defaultJmsListenerContainerFactory", destination = "account.create")
public Account create(Account account) {
LOG.warn("Received " + account);
setLastReceived(account);
return accountService.create(account);
}
public synchronized Account getLastReceived() {
return lastReceived;
}
public synchronized void setLastReceived(Account lastReceived) {
this.lastReceived = lastReceived;
}
}

当有一个消费者时,不清楚负载均衡是什么意思,但根据您对我对您问题的评论的评论:

只要目标是队列(不是主题(,并且由于您factory.setPubSubDomain(false)而暗示了这一点,那么它就会起作用。这是JMS合同的一部分。如果同一队列中有多个使用者,则消息将分布在这些使用者之间;只有一个使用者会收到特定消息。

如果交付失败,则可能会或可能不会将其重新交付给同一消费者。

大多数代理(包括ActiveMQ(都提供某种预取机制。IIRC,使用ActiveMQ时,默认情况下为1000。如果消息少于此值,则一个使用者可能处于空闲状态;如果是这样,请减少预取以调整分布。

最新更新