默认JmsListenerContainerFactory配置进程并发性



我正在为JmsFactory配置并发,以便使用我的队列。

我做到了:

@Bean(name = "myFactory")
public DefaultJmsListenerContainerFactory sqsFactory(SQSConnectionFactory connectionFactory,
CustomJmsListenerConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setConcurrency("1-3");
return factory;
}

我看到DefaultJmsListenerContainerFactory.setConcurrency调用了后面的DefaultMessageListenerContain。

我的应用程序中配置了2个队列,我使用的是spring-boot:

@JmsListener(destination = "queue1", containerFactory = "myFactory")
@JmsListener(destination = "queue2", containerFactory = "myFactory")

我正在阅读春季文档,并面临一些方法,现在我有一些疑问。

1-之间的区别是什么

setConcurrency(String concurrency)
setConcurrentConsumers(int concurrentConsumers)

即使阅读了文档,我也不理解其中的区别以及这种配置如何改变应用程序的行为。我认为setConcurrency应该是每个@jmsLister将用于从队列中获取消息的线程数。。。你能解释一下配置映像的例子吗?我有100条消息排队(每个配置的队列(?

2-setMaxMessagesPerTask(int maxMessagesPerTask(

如果队列中有100条消息,并发性=3,并且这个数字是10(默认值(,那么行为是什么?

阅读这两种情况下的Javadocs。

1.

setConcurrency(String concurrency)

这只是一个方便的

/**
* Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
* upper limit String, e.g. "10" (the lower limit will be 1 in this case).
* <p>This listener container will always hold on to the minimum number of consumers
* ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
* of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
*/
@Override
public void setConcurrency(String concurrency) {
try {
int separatorIndex = concurrency.indexOf('-');
if (separatorIndex != -1) {
setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1)));
}
else {
setConcurrentConsumers(1);
setMaxConcurrentConsumers(Integer.parseInt(concurrency));
}
}
catch (NumberFormatException ex) {
throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
"single maximum integer (e.g. "5") and minimum-maximum combo (e.g. "3-5") supported.");
}
}

setConcurrency("1-3")与相同

setConcurrentConsumers(1);
setMaxConcurrentConsumers(3);
  1. 对消息处理没有影响;这只是意味着消费者任务每10条消息被回收(停止和启动(一次
/**
* Specify the maximum number of messages to process in one task.
* More concretely, this limits the number of message reception attempts
* per task, which includes receive iterations that did not actually
* pick up a message until they hit their timeout (see the
* {@link #setReceiveTimeout "receiveTimeout"} property).
* <p>Default is unlimited (-1) in case of a standard TaskExecutor,
* reusing the original invoker threads until shutdown (at the
* expense of limited dynamic scheduling).
* <p>In case of a SchedulingTaskExecutor indicating a preference for
* short-lived tasks, the default is 10 instead. Specify a number
* of 10 to 100 messages to balance between rather long-lived and
* rather short-lived tasks here.
* <p>Long-lived tasks avoid frequent thread context switches through
* sticking with the same thread all the way through, while short-lived
* tasks allow thread pools to control the scheduling. Hence, thread
* pools will usually prefer short-lived tasks.
* <p><b>This setting can be modified at runtime, for example through JMX.</b>
* @see #setTaskExecutor
* @see #setReceiveTimeout
* @see org.springframework.scheduling.SchedulingTaskExecutor#prefersShortLivedTasks()
*/
public void setMaxMessagesPerTask(int maxMessagesPerTask) {
Assert.isTrue(maxMessagesPerTask != 0, "'maxMessagesPerTask' must not be 0");
synchronized (this.lifecycleMonitor) {
this.maxMessagesPerTask = maxMessagesPerTask;
}
}

最新更新