如何设置ActiveMQ Artemis重新运输策略不阻止队列



背景

我在Apache Artemis上有JMS消息队列2.7.0.Redhat-00056。经纪人配置为10分钟的redelivery-delay。如果我向队列发布一条消息,并且在消费者上失败了,那么它将返回队列作为预定消息,以在10分钟的时间内传递。任何随后发布的消息都是直接处理的,因此队列不会被计划的消息阻止。

如果快速连续发送了许多消息,那么会发生什么都失败并安排了10分钟的时间。在这种情况下,看起来Artemis试图保留消息顺序。

文档

Redelivery上的文档说以下内容:

其他后续消息将定期发送,只有被取消的消息将异步发送回延迟后的队列。

重新传递文档

问题

对我来说似乎不一致,如果您近距离发布消息,Artemis似乎保留了订单,而如果消息之间有轻微的延迟(根据文档(。

我正在尝试找到一个解决方案,以便如果一条消息失败并且需要在10分钟内重新送达,以免阻止后续消息。

示例

重新创建它不需要任何特别的东西。如所述,您只需要快速连续发送一些消息,以在经纪人上具有重新交付政策的队列。我一直在使用一个基本示例进行测试:

Spring Boot应用程序在启动时生成五个消息。

@SpringBootApplication
public class ArtemisTestApplication
{
    private Logger logger = LoggerFactory.getLogger(ArtemisTestApplication.class);
    @Autowired
    private JmsTemplate jmsTemplate;
    @PostConstruct
    public void init()
    {
        send("Message1");
        send("Message2");
        send("Message3");
        send("Message4");
        send("Message5");
    }
    public void send(String msg)
    {
        logger.debug("Sending message :{}", msg);
        jmsTemplate.convertAndSend("jms.queue.TestQueue", msg);
    }
    public static void main(String[] args)
    {
        SpringApplication.run(ArtemisTestApplication.class, args);
    }
}

消耗消息并发出错误以触发重新运输策略。

@Component
public class TestConsumer
{
    private Logger logger = LoggerFactory.getLogger(TestConsumer.class);
    @JmsListener(destination = "jms.queue.TestQueue")
    public void receive(TextMessage message) throws JMSException
    {
        logger.debug("Message received: {}", message.getText());
        throw new RuntimeException("Force redelivery policy");
    }
}

使用Spring Boot Initileizr生成该应用程序。除了给它起一个名字,唯一选择的是消息传递下的Artemis依赖。

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-artemis</artifactId>
    </dependency>

在应用程序中。

spring.artemis.mode=native
spring.artemis.host=localhost
spring.artemis.port=61616
spring.artemis.user=
spring.artemis.password=

,在经纪人上,我已经配置了重新交付策略的队列。注意:我将延迟设置为0,问题仍然存在,因为所有消息都被阻止,直到第一条消息进行了三次尝试并移至DLQ。如果将延迟更改为正数,则会看到所有五个消息都安排在以后交付。

<address-settings>      
    <address-setting match="jms.queue.TestQueue">            
        <dead-letter-address>DLQ</dead-letter-address>                      
        <redelivery-delay>0</redelivery-delay>    
        <max-delivery-attempts>3</max-delivery-attempts>
    </address-setting>    
  </address-settings>
<addresses>     
    <address name="DLQ">            
        <anycast>               
          <queue name="DLQ" />            
        </anycast>         
      </address>       
      <address name="jms.queue.TestQueue">            
        <anycast>               
          <queue name="jms.queue.TestQueue" />            
        </anycast>         
      </address>                      
</addresses>

我得出的结论是,这是Artemis的错误。我为此提出了一张票,并给其他人遇到同样的问题的评论。

https://issues.apache.org/jira/browse/artemis-2417

与此同时,我不得不更改我们的客户端应用程序来处理重新交付策略本身。如果有错误阅读消息,则我们将消息上的计数器递增,并将其写入所需延迟的新消息。然后确认所消耗的消息以解开队列并允许阅读其他消息。我已经将在经纪人上配置的重新交付策略留下了,以防该逻辑之外存在错误或未捕获的内容。这不是理想的,但至少现在满足了要求。

当我对错误:ActiveMQ Classic在客户端ActiveMQConnectionFactory上的转换:

上都具有相同功能的切换:

activeMqConnectionFactory.setNonBlockingRedelivery(true)

观察到的行为背后的意图是保持消息顺序:假设第一条消息实际上通过了第二次尝试(错误是暂时的(:为了在队列上保持订购,直到所有尝试之前,您都无法传递更多消息因为当前消息已经用尽。对于MATS3 以消息为导向的异步RPC 库,这是没有意义的,因为它明确(显然是由于库语义(无法保证通过a&quot" MATS Flow"订购。 - 所以当我终于找到配置参数时,我很高兴。

所以,虽然这有意义。订购消息 - 特别是对于您有单个消费者排队的情况 - 如果Artemis也具有这样的功能可以关闭此功能,那就太好了,只需继续前进并传递下一条消息即可。/p>

(显然,如果启用了非封锁,则显然是在预摘要的消息列表的末尾将"滚回消息"放置在"消息超过了预取限制,它仍然必须在获取任何新消息之前"(尝试重新传播(这些消息。

最新更新