安全结束运行RabbitMQ消费者的java应用程序的最好方法是什么?



我们有一个独立的java应用程序在Debian机器上做一些后台处理。它必须处理的任务是通过RabbitMQ消息发送的。

当java应用程序需要升级时,我们需要停止它(杀死它)。但是我们必须确保当前没有消费者正在处理消息。根据你的经验,实现这一目标的最佳方式是什么?

我们尝试向消费者发送'SHUTDOWN'消息,但我们似乎无法关闭队列或通道?!它会冻结应用程序!或者是否有另一种解决方案,例如我们可以自动关闭应用程序,而无需在linux中执行kill命令?

谢谢分享你的经验。

问候

RabbitMQ Java库不提供(AFAIK)任何会在一些消息仍在处理时自动延迟关闭消费者进程的东西。因此,你必须自己做这件事。

如果您的应用程序可以容忍它,就关闭它。此时未被确认的任何消息将保留在代理内的队列中,并将在消费者返回时重新交付。

如果你不能容忍这一点,你必须确保所有正在进行的消息处理完成,那么你需要遵循类似于这个答案中的建议,并在你的shutdown处理程序中执行以下操作:

  1. 设置"所有线程应该退出"标志为true
  2. 与线程池中的每个线程连接
  3. 优雅地退出

这意味着您的每个消息处理线程(假设您有多个并发处理消息的线程)都需要遵循这个通用模式:

  1. 从队列中取出一条消息并处理它
  2. 确认刚刚处理的消息
  3. 如果"all threads should exit"标志为true,退出线程函数
  4. 冲洗,重复

希望对你有帮助。

这是我的看法。

我创建了DefaultConsumer (BasicConsumer)的子类,它提供了isCancelled()并实现了handleCancelOk(),它将"取消标志"设置为true。

开始序列:

consumers = new ArrayList<BasicConsumer>();
consumers.add(...)

停止序列:

// Cancel all consumers
for (BasicConsumer consumer : consumers) {
  try {
    consumer.getChannel().basicCancel(consumer.getConsumerTag());
  } catch (Exception e) {
    // report
  }
}
// Wait for all consumers to be cancelled
Timeout timeout = ...;
while (!consumers.isEmpty() && !timeout.isElapsed()) {
  // Remove cancelled consumers
  for (Iterator<BasicConsumer> iterator = consumers.iterator(); iterator.hasNext();) {
    if (iterator.next().isCancelled())
      iterator.remove();
  }
}
// Here we could force-close the remaining timed-out consumers if we
// used our own ExecutorService by shutting down all of its threads.
connection.close();

相关RabbitMQ ML线程:如何使用consumer干净地关闭Java应用程序?

相关内容

最新更新