amqp.rabbit.core.RabbitTemplate.send(消息消息)在20-40ms后返回,即使在侦听器



我有一个使用以下依赖项的示例应用程序:

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>1.4.5.RELEASE</version>
</dependency>

我有一个兔子发送器,配置如下:

<!-- Create rabbitMQ connection factory -->
<rabbit:connection-factory id="rabbitConnectionFactory"
cache-mode="${rabbitmq.cacheMode}" channel-cache-size="${rabbitmq.channelCacheSize}"
host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}"
port="${rabbitmq.port}" />
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"
scope="prototype">
<property name="connectionFactory" ref="rabbitConnectionFactory" />
<property name="channelTransacted" value="true" />
<property name="exchange" value="wfExchange" />
<property name="routingKey" value="abcd" />
</bean>

配置值是从属性文件中读取的。一个监听器配置如下:

<!-- Enable Annotation Driven Configuration -->
<rabbit:connection-factory id="rabbitConnectionFactory"
cache-mode="${rabbitmq.cacheMode}" channel-cache-size="${rabbitmq.channelCacheSize}"
host="${rabbitmq.host}" username="${rabbitmq.username}" password="${rabbitmq.password}"
port="${rabbitmq.port}" />
<!-- this bean looks for beans of type Queue, Exchange and Binding and declares 
them to the broker -->
<rabbit:admin connection-factory="rabbitConnectionFactory"
auto-startup="true" />
<!-- define event queue -->
<rabbit:queue name="abcd" auto-delete="false"
durable="true" exclusive="false" auto-declare="true">
</rabbit:queue>
<rabbit:direct-exchange name="wfExchange"
durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="abcd" key="abcd" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- Instantiate event listener and listener container -->
<bean id="eventListener" class="com.sample.Listener" />
<rabbit:listener-container id="eventListenerContainer"
connection-factory="rabbitConnectionFactory" prefetch="10" acknowledge="manual">
<rabbit:listener ref="eventListener" queue-names="abcd" />
</rabbit:listener-container>
</beans>

在应用程序上下文配置中,我正在加载:

<import resource="classpath:spring/sender.xml" />
<import resource="classpath:spring/listener.xml" />

一旦我运行了我的应用程序,它正在循环发送10条消息:

for(int i=0; i<10; i++) {
String messageBody = "Hello number: " + (i+1);
Message message1 = MessageBuilder
.withBody(messageBody.getBytes())
.build();
System.out.println("Message before send: "+messageBody+ " at "+ new Date().getTime());
entryPoint.user1.getRabbitTemplate().send(message1);
System.out.println("Message sent: "+messageBody+ " at "+ new Date().getTime());
}

我的监听器代码是:

@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("Received the message: " + new String(message.getBody()) + " at " + 
new Date().getTime());
messageList.add(message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

注:1.RabbitTemplate已配置为:this.rabbitTemplate.setRoutingKey("abcd");this.rabbitTemplate.setExchange("wfExchange");

  1. RabbitMQ安装版本-3.5.4

  2. rabbitmq.cacheMode=通道rabbitmq.channelCacheSize=10

我在多次跑步后观察到:send()在20-40毫秒后返回,即使在侦听器接收到消息之后也是如此。为什么发送要花这么多时间才能返回?它正在降低我的应用程序速度。send()还有其他作用吗?如何减少send()的开销?

我从未见过它需要40ms,但RabbitMQ的事务速度非常慢。在发送时禁用事务并使用发布者确认/返回要快得多。

最新更新