ActiveMQ+NMS.未遵守预取限制



我有一个同步使用者通过这个URL连接到一个队列,它还设置了预取限制:

tcp://localhost:61616?nms.prefetchPolicy.all=5

我以编程方式检查限制并将其输出到日志中,这样我就知道语法是正确的,并且正在进行设置。

我对预取的理解是,同步消费者将收到x条消息,一旦它确认了其中的50%,就会收到另一批消息。这不是我所看到的行为。

  • 我已在URL中将预取设置为5
  • 我已经用20条消息预先填充了一个队列
  • 我记录收到的每一条消息
  • 我已经注释掉了代码中的message.Acknowledge();

所以我认为我应该在日志中看到5条消息,不再出现,因为我没有确认其中任何一条。然而,我看到所有20条消息都出现在日志中。

我是否误解了预取的工作方式,或者我的代码中有漏洞?

代码(c#,.Net Core(:

public Worker(LogWriter logger, ServiceConfiguration config, IConnectionFactory connectionFactory, IEndpointClient endpointClient)
{
log = logger;
configuration = config;
this.endpointClient = endpointClient;
connection = connectionFactory.CreateConnection();
connection.RedeliveryPolicy = GetRedeliveryPolicy();
connection.ExceptionListener += new ExceptionListener(OnException);
connection.Start();
session = connection.CreateSession(AcknowledgementMode.IndividualAcknowledge);
queue = session.GetQueue(configuration.JmsConfig.SourceQueueName);
consumer = session.CreateConsumer(queue);
log.InfoFormat("Prefetch set to:{0}. Maximum configurable:{1}", ((Connection)connection).PrefetchPolicy.QueuePrefetch, PrefetchPolicy.MAX_PREFETCH_SIZE);
while (true)
{
var message = consumer.Receive(TimeSpan.FromSeconds(5));
if (!Equals(message, null))
{
OnMessage(message);
}
}
}
public void OnMessage(IMessage message)
{
log.DebugFormat("Message {count} Received. Attempt:{attempt}", message.Properties.GetInt("count"), message.Properties.GetInt("NMSXDeliveryCount"));
//message.Acknowledge();
}

日志:

[18:07:37 INF] Prefetch set to:5. Maximum configurable:32766
[18:07:38 DBG] Message 0 Received. Attempt:2
[18:07:38 DBG] Message 1 Received. Attempt:2
[18:07:38 DBG] Message 2 Received. Attempt:2
[18:07:38 DBG] Message 3 Received. Attempt:2
[18:07:40 DBG] Message 4 Received. Attempt:2
[18:07:40 DBG] Message 5 Received. Attempt:2
[18:07:40 DBG] Message 6 Received. Attempt:2
[18:07:40 DBG] Message 7 Received. Attempt:2
[18:07:40 DBG] Message 8 Received. Attempt:2
[18:07:40 DBG] Message 9 Received. Attempt:2
[18:07:40 DBG] Message 10 Received. Attempt:2
[18:07:40 DBG] Message 11 Received. Attempt:2
[18:07:40 DBG] Message 12 Received. Attempt:2
[18:07:40 DBG] Message 13 Received. Attempt:2
[18:07:40 DBG] Message 14 Received. Attempt:2
[18:07:40 DBG] Message 15 Received. Attempt:2
[18:07:41 DBG] Message 16 Received. Attempt:2
[18:07:41 DBG] Message 17 Received. Attempt:2
[18:07:41 DBG] Message 18 Received. Attempt:2
[18:07:41 DBG] Message 19 Received. Attempt:2

提前感谢:(

附言:我正在使用:

  • ActiveMQ v5.15.12
  • Apache.NMS.ActiveMQ.NetCore v1.7.2(NuGet(

预取值并不意味着您可以控制队列调度,它只是控制在应用程序实际上没有进行任何接收处理或消耗缓慢时,将有多少缓冲等待交付给您。这允许代理向客户端提供一些消息,然后继续向其他客户端提供消息,这些消息在第一个客户端处理其预取时到达。

一旦您开始消费邮件,无论您是否确认,客户端都会延长预取信用额度,以允许始终存在预取金额大小的积压。这是因为作为消费者,您可能希望大批量读取许多消息,并一次确认所有消息,这就是客户端确认和事务消费的作用。NMS客户端将预取信用额度增加到70%(如果我记得正确的话(,而不是50%,但不管怎样,只要你从预取缓冲区中提取了那么多消息,就会发生这种情况。

如果您希望控制从代理发送的消息,则需要使用零预取设置和同步接收消费者,以确保在任何给定时间只有一条消息可以发送到客户端。

在broker配置中禁用usePrefetchExtension。默认情况下,代理将在没有ack响应的情况下调度消息。

根据文件:

当消息被传递但未被确认时,会使用预取扩展,这样代理就可以调度另一条消息,例如,预取==0,其思想是总是有预取数量的消息挂起。它还允许事务批处理超过预取值。

最新更新