背景
我在Apache Artemis上有JMS消息队列2.7.0.Redhat-00056。经纪人配置为10分钟的redelivery-delay
。如果我向队列发布一条消息,并且在消费者上失败了,那么它将返回队列作为预定消息,以在10分钟的时间内传递。任何随后发布的消息都是直接处理的,因此队列不会被计划的消息阻止。
如果快速连续发送了许多消息,那么会发生什么都失败并安排了10分钟的时间。在这种情况下,看起来Artemis试图保留消息顺序。
文档
Redelivery上的文档说以下内容:
其他后续消息将定期发送,只有被取消的消息将异步发送回延迟后的队列。
重新传递文档
问题
对我来说似乎不一致,如果您近距离发布消息,Artemis似乎保留了订单,而如果消息之间有轻微的延迟(根据文档(。
我正在尝试找到一个解决方案,以便如果一条消息失败并且需要在10分钟内重新送达,以免阻止后续消息。
示例
重新创建它不需要任何特别的东西。如所述,您只需要快速连续发送一些消息,以在经纪人上具有重新交付政策的队列。我一直在使用一个基本示例进行测试:
Spring Boot应用程序在启动时生成五个消息。
@SpringBootApplication
public class ArtemisTestApplication
{
private Logger logger = LoggerFactory.getLogger(ArtemisTestApplication.class);
@Autowired
private JmsTemplate jmsTemplate;
@PostConstruct
public void init()
{
send("Message1");
send("Message2");
send("Message3");
send("Message4");
send("Message5");
}
public void send(String msg)
{
logger.debug("Sending message :{}", msg);
jmsTemplate.convertAndSend("jms.queue.TestQueue", msg);
}
public static void main(String[] args)
{
SpringApplication.run(ArtemisTestApplication.class, args);
}
}
消耗消息并发出错误以触发重新运输策略。
@Component
public class TestConsumer
{
private Logger logger = LoggerFactory.getLogger(TestConsumer.class);
@JmsListener(destination = "jms.queue.TestQueue")
public void receive(TextMessage message) throws JMSException
{
logger.debug("Message received: {}", message.getText());
throw new RuntimeException("Force redelivery policy");
}
}
使用Spring Boot Initileizr生成该应用程序。除了给它起一个名字,唯一选择的是消息传递下的Artemis依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-artemis</artifactId>
</dependency>
在应用程序中。
spring.artemis.mode=native
spring.artemis.host=localhost
spring.artemis.port=61616
spring.artemis.user=
spring.artemis.password=
,在经纪人上,我已经配置了重新交付策略的队列。注意:我将延迟设置为0,问题仍然存在,因为所有消息都被阻止,直到第一条消息进行了三次尝试并移至DLQ。如果将延迟更改为正数,则会看到所有五个消息都安排在以后交付。
<address-settings>
<address-setting match="jms.queue.TestQueue">
<dead-letter-address>DLQ</dead-letter-address>
<redelivery-delay>0</redelivery-delay>
<max-delivery-attempts>3</max-delivery-attempts>
</address-setting>
</address-settings>
<addresses>
<address name="DLQ">
<anycast>
<queue name="DLQ" />
</anycast>
</address>
<address name="jms.queue.TestQueue">
<anycast>
<queue name="jms.queue.TestQueue" />
</anycast>
</address>
</addresses>
我得出的结论是,这是Artemis的错误。我为此提出了一张票,并给其他人遇到同样的问题的评论。
https://issues.apache.org/jira/browse/artemis-2417
与此同时,我不得不更改我们的客户端应用程序来处理重新交付策略本身。如果有错误阅读消息,则我们将消息上的计数器递增,并将其写入所需延迟的新消息。然后确认所消耗的消息以解开队列并允许阅读其他消息。我已经将在经纪人上配置的重新交付策略留下了,以防该逻辑之外存在错误或未捕获的内容。这不是理想的,但至少现在满足了要求。
当我对错误:ActiveMQ Classic在客户端ActiveMQConnectionFactory上的转换:
上都具有相同功能的切换: activeMqConnectionFactory.setNonBlockingRedelivery(true)
观察到的行为背后的意图是保持消息顺序:假设第一条消息实际上通过了第二次尝试(错误是暂时的(:为了在队列上保持订购,直到所有尝试之前,您都无法传递更多消息因为当前消息已经用尽。对于MATS3 以消息为导向的异步RPC 库,这是没有意义的,因为它明确(显然是由于库语义(无法保证通过a&quot" MATS Flow"订购。 - 所以当我终于找到配置参数时,我很高兴。
所以,虽然这有意义。订购消息 - 特别是对于您有单个消费者排队的情况 - 如果Artemis也具有这样的功能可以关闭此功能,那就太好了,只需继续前进并传递下一条消息即可。/p>
(显然,如果启用了非封锁,则显然是在预摘要的消息列表的末尾将"滚回消息"放置在"消息超过了预取限制,它仍然必须在获取任何新消息之前"(尝试重新传播(这些消息。