>我正在发布消息到请求队列并将这些消息放入回复队列中。在这里,我想进一步处理所有收到的消息,因此我想在发送到我的处理器之前将所有消息添加到列表中(进行进一步的操作(。 下面是我的代码。
@Autowired
private GeneralProcess generalProcess;
List <RequestPojo> requestPojoGeneral = new ArrayList<RequestPojo>();
@RabbitHandler
@RabbitListener(containerFactory = "simpleMessageListenerContainerFactory", queues ="BulkSolve_GeneralrequestQueue")
public void subscribeToRequestQueue(@Payload RequestPojo sampleRequestMessage, Message message) throws InterruptedException {
requestPojoGeneral.add(sampleRequestMessage);
System.out.println("List size issssss:" +requestPojoGeneral.size() );
generalProcess.processRequestObjectslist(requestPojoGeneral);
/*System.out.println("message in general listener is:" + sampleRequestMessage.getDistance());
System.out.println("Message payload is:" + sampleRequestMessage);
System.out.println("Message payload1111 is:" + message );*/
}
上面的代码我无法一次获取所有消息。我必须等到我得到所有消息并填充列表,然后再调用generalProcess.processRequestObjectslist(requestPojoGeneral(;。任何人都可以建议等待调用处理器方法的最佳方法,直到获取所有消息并添加到列表中。
谢谢。
您可以在消息中添加标头,例如messageCount
.
然后
if ((int) message.getMessageProperties().getHeader("messageCount") == list.size() {
process();
}
(或其他一些技术,例如添加标头"lastMessage=true"(。
请记住,如果您的服务器在您获得完整列表之前崩溃,除非您将 aknowledgeMode 设置为MANUAL
,否则您将丢失消息。
然后,在处理完所有消息后,在频道上呼叫basicAck
以获取最后一条消息。
您可以访问Channel
作为方法参数,并且 deliveryTag 位于MessageProperties
中。
容器prefetchCount
必须至少与最大批次一样大,因为代理只允许该数量的未确认消息未完成。