Rabbitmq 出现故障时的回退机制



My Spring Boot 应用程序通过 RabbitMQ 将事件发送到 Timescale。 我需要知道如何在 RabbitMQ 出现故障时保存我的事件。

详细地:

RabbitMQ 发布的消息是持久性的。当消息代理关闭时,事件不会发布,我计划将这些事件存储在数据库中,并在 RabbitMQ 启动时再次发布它们。 欢迎任何解决方案和建议。

弹性、并发、高效、可移植和本地

正如 Kayaman 所建议的那样,如果保留消息至关重要,您应该使用弹性、并发、高效且最好是本地(在同一台机器上(的数据库系统。

  • 如果 RabbitMQ 不可用,原因很可能与网络中断有关。因此,如果可能,您的回退机制应在同一台计算机上本地。
  • 我们不想让您的本地服务器负担过重。因此,理想情况下,用作回退的数据库应该有效地使用 RAM、CPU 和存储。
  • 如果对 RabbitMQ 的访问连续反复失败,我们可能会将消息添加到我们的回退数据库中,同时在时间上与将记录的消息移出数据库以返回到 RabbitMQ 的时间重叠。因此,回退机制应该能够处理并发访问。
  • 如果保留这些消息至关重要,那么我们的回退数据库应该是有弹性的,能够容忍崩溃/断电。
  • 如果重新部署服务器,如果回退机制是可移植的,不依赖于任何特定的操作系统或 CPU 指令集,那就太好了。因此,基于Java可以解决这个问题。

H2 数据库引擎

满足这些需求的第一个想法是使用H2 数据库引擎。H2 是一个关系数据库,用纯 Java 构建,积极开发,并被证明是值得生产的。

要考虑的类似产品是Apache Derby。但我听说过各种问题,可能意味着它不值得生产,尽管你应该研究它的现状。

H2 的关系部分可能不相关,因为您可能只需要一个表来跟踪稍后要重新发送到 RabbitMQ 的消息流。至于本地、高效、弹性、可移植和并发的其他要求,H2 符合要求。如果您需要添加其他表,H2 已准备好作为一个完全关系的数据库系统。

H2 可以通过以下两种模式之一启动:

  • 数据库服务器,能够从各种进程和计算机获取连接。
  • 嵌入式数据库,仅允许来自同一 Java 应用程序内的连接。

如果没有更多信息,很难说哪种模式适合您的需求。如果要附加外部监视工具,则可能需要服务器模式。如果你想要简单和精益,那么嵌入式模式。

您的 Java 应用程序通过随附的 JDBC 驱动程序连接到 H2。

Rabbit 有自己的持久性,您可以配置队列,以便它们在关机后持续存在,并立即返回失败前的相同消息。

扩展一下我的评论:实际上没有必要使用数据库进行持久缓冲。

编辑:此外,无法向RabbitMQ发送消息的原因很可能是网络连接丢失。在这种情况下,大多数 DBMS 几乎没有用处。编辑结束。

package io.mahlberg.stackoverflow.questions.objpersistencedemo;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
public class PersistenceDemo {
/*
* A decent default number of objects to be buffered.
*/
private static long COUNT = 10000L;
/*
* Default filename
*/
private static final String OBJ_DAT = "./obj.dat";
private static FileOutputStream   fos;
private static ObjectOutputStream oos;
private static FileInputStream   fis;
private static ObjectInputStream ois;
private static File dat;
private static Object lock;
public static void main(String[] args) throws InterruptedException, IOException {
// Get the actual number of counts
if (args[0] != null) {
COUNT = Long.parseLong(args[0]);
}
// Initialize out lock
lock = new Object();
// Ensure the datafile exists.
dat = new File(OBJ_DAT);
dat.createNewFile();
// Initialize our streams.
try {
fos = new FileOutputStream(dat);
} catch (Exception e1) {
e1.printStackTrace();
System.exit(1);
}
oos = new ObjectOutputStream(fos);
// Define the writer thread.
Thread writer = new Thread(new Runnable() {
public void run() {
Data obj;
// Make sure we have the behaviour of the queue.
synchronized (lock) {
for (int i = 0; i < COUNT; i++) {
obj = new Data(String.format("Obj-%d", i), new Date());
try {
oos.writeObject(obj);
oos.flush();
fos.flush();
// Notify the reader...
lock.notify();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
try {
// ... and wait until the reader is finished.
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
// We need to notify the reader one last time for the last
// Object we put into the stream.
lock.notify();
}
}
});
// Initialize the streams used by reader.
fis = new FileInputStream(dat);
ois = new ObjectInputStream(fis);
Thread reader = new Thread(new Runnable() {
public void run() {
Data obj;
while (true) {
synchronized (lock) {
try {
obj = (Data) ois.readObject();
// Notify writer we are finished with reading the latest entry...
lock.notify();
} catch (ClassNotFoundException e1) {
e1.printStackTrace();
} catch (IOException e1) {
break;
}
try {
// ...and wait till writer is done writing.
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
}
}
});
// For doing a rough performance measurement.
Instant start = Instant.now();
writer.start();
reader.start();
// Wait till both threads are done.
writer.join();
reader.join();
Instant end = Instant.now();
Duration timeElapsed = Duration.between(start, end);
System.out.format("Took %sms for %d objectsn", timeElapsed.toMillis(), COUNT);
System.out.format("Avg: %.3fms/objectn", ((double) timeElapsed.toMillis() / COUNT));
// Cleanup
oos.close();
fos.close();
ois.close();
fis.close();
}
}

基本上,我们使用synchronizenotifywait来模拟带有文件的 FIFO 缓冲区。

请注意,为了简洁和可读性,我采取了一些快捷方式,但我想你明白了。阅读器应每隔一段时间检查一次文件大小(频率取决于数据的大小(并截断文件,并且实际上不存在错误处理。我从该类创建了一个 jar 和一个数据类,下面是一些示例结果:

$ for i in {1..10}; do java -jar target/objpersistencedemo-0.0.1-SNAPSHOT.jar 20000; done
20000
Took 1470ms for 20000 objects
Avg: 0,074ms/object
20000
Took 1510ms for 20000 objects
Avg: 0,076ms/object
20000
Took 1614ms for 20000 objects
Avg: 0,081ms/object
20000
Took 1600ms for 20000 objects
Avg: 0,080ms/object
20000
Took 1626ms for 20000 objects
Avg: 0,081ms/object
20000
Took 1620ms for 20000 objects
Avg: 0,081ms/object
20000
Took 1489ms for 20000 objects
Avg: 0,074ms/object
20000
Took 1604ms for 20000 objects
Avg: 0,080ms/object
20000
Took 1632ms for 20000 objects
Avg: 0,082ms/object
20000
Took 1564ms for 20000 objects
Avg: 0,078ms/object

请注意,这些值用于写入读取。我猜小于 0.1ms/对象比从 RDBMS 写入和随后读取要快得多。

如果你让你的读取器将消息发送到 RabbitMQ 实例,并添加一点截断和退避逻辑,你基本上可以确保你拥有的所有事件要么在缓冲区文件中,要么写入 RabbitMQ。

最新更新