如何将回复队列的所有接收消息添加到列表中



>我正在发布消息到请求队列并将这些消息放入回复队列中。在这里,我想进一步处理所有收到的消息,因此我想在发送到我的处理器之前将所有消息添加到列表中(进行进一步的操作(。 下面是我的代码。

@Autowired
private GeneralProcess generalProcess;
List <RequestPojo> requestPojoGeneral = new ArrayList<RequestPojo>();
@RabbitHandler
@RabbitListener(containerFactory = "simpleMessageListenerContainerFactory", queues ="BulkSolve_GeneralrequestQueue")
public void subscribeToRequestQueue(@Payload RequestPojo sampleRequestMessage, Message message) throws InterruptedException {

requestPojoGeneral.add(sampleRequestMessage);
System.out.println("List size issssss:" +requestPojoGeneral.size() );
generalProcess.processRequestObjectslist(requestPojoGeneral);
/*System.out.println("message in general listener is:" + sampleRequestMessage.getDistance());
System.out.println("Message payload is:" + sampleRequestMessage);
System.out.println("Message payload1111 is:" + message );*/
}

上面的代码我无法一次获取所有消息。我必须等到我得到所有消息并填充列表,然后再调用generalProcess.processRequestObjectslist(requestPojoGeneral(;。任何人都可以建议等待调用处理器方法的最佳方法,直到获取所有消息并添加到列表中。

谢谢。

您可以在消息中添加标头,例如messageCount.

然后

if ((int) message.getMessageProperties().getHeader("messageCount") == list.size() {
process();
}

(或其他一些技术,例如添加标头"lastMessage=true"(。

请记住,如果您的服务器在您获得完整列表之前崩溃,除非您将 aknowledgeMode 设置为MANUAL,否则您将丢失消息。

然后,在处理完所有消息后,在频道上呼叫basicAck以获取最后一条消息。

您可以访问Channel作为方法参数,并且 deliveryTag 位于MessageProperties中。

容器prefetchCount必须至少与最大批次一样大,因为代理只允许该数量的未确认消息未完成。