我们有一个独立的java应用程序在Debian机器上做一些后台处理。它必须处理的任务是通过RabbitMQ消息发送的。
当java应用程序需要升级时,我们需要停止它(杀死它)。但是我们必须确保当前没有消费者正在处理消息。根据你的经验,实现这一目标的最佳方式是什么?我们尝试向消费者发送'SHUTDOWN'消息,但我们似乎无法关闭队列或通道?!它会冻结应用程序!或者是否有另一种解决方案,例如我们可以自动关闭应用程序,而无需在linux中执行kill命令?
谢谢分享你的经验。
问候
RabbitMQ Java库不提供(AFAIK)任何会在一些消息仍在处理时自动延迟关闭消费者进程的东西。因此,你必须自己做这件事。
如果您的应用程序可以容忍它,就关闭它。此时未被确认的任何消息将保留在代理内的队列中,并将在消费者返回时重新交付。
如果你不能容忍这一点,你必须确保所有正在进行的消息处理完成,那么你需要遵循类似于这个答案中的建议,并在你的shutdown处理程序中执行以下操作:
- 设置"所有线程应该退出"标志为true
- 与线程池中的每个线程连接 优雅地退出
这意味着您的每个消息处理线程(假设您有多个并发处理消息的线程)都需要遵循这个通用模式:
- 从队列中取出一条消息并处理它
- 确认刚刚处理的消息
- 如果"all threads should exit"标志为true,退出线程函数
- 冲洗,重复
希望对你有帮助。
这是我的看法。
我创建了DefaultConsumer (BasicConsumer)的子类,它提供了isCancelled()并实现了handleCancelOk(),它将"取消标志"设置为true。
开始序列:
consumers = new ArrayList<BasicConsumer>();
consumers.add(...)
停止序列:
// Cancel all consumers
for (BasicConsumer consumer : consumers) {
try {
consumer.getChannel().basicCancel(consumer.getConsumerTag());
} catch (Exception e) {
// report
}
}
// Wait for all consumers to be cancelled
Timeout timeout = ...;
while (!consumers.isEmpty() && !timeout.isElapsed()) {
// Remove cancelled consumers
for (Iterator<BasicConsumer> iterator = consumers.iterator(); iterator.hasNext();) {
if (iterator.next().isCancelled())
iterator.remove();
}
}
// Here we could force-close the remaining timed-out consumers if we
// used our own ExecutorService by shutting down all of its threads.
connection.close();
相关RabbitMQ ML线程:如何使用consumer干净地关闭Java应用程序?