当状态类在akka持久性中发生变化时,是否可以重建快照



我们将akka persistence 2.6.15中的EventSourcedBehavior用于CQRS/EventSourcing应用程序,并将akka persistence jdbc 4.0.0用于在PostgreSQL数据库中存储事件和快照。

我们有用快照序列化的状态类。但有时这些状态类会发生变化,这使得读取快照明显失败。我们通过删除那些更改的快照来管理它:

delete from snapshot sn 
where sn.persistence_id::uuid in (select id from some_entity_table);       

但对于具有大量事件的实体,在发送新命令时,需要花费大量时间才能获取最新的快照,从而导致超时。

是否可以在应用程序启动时强制重新生成快照?

可以说;真";解决方案是使用SerializerWithStringManifest,它可以将以前的快照格式反序列化为当前格式。如果使用反射驱动的序列化,这可能会更加困难。它也不会迁移现有的快照。

您可以做的一个技巧是将显式ForceSnapshot命令添加到持久参与者的协议中。这在经典API中非常容易,在那里您可以对快照进行更多的控制,所以我就不讨论了。然而,在EventSourcedBehavior类型的API中,如果事件是SnapshotForced,则添加SnapshotForced事件并更改snapshotWhen函数以返回true需要更加精细。

无论是Classic还是Typed,您都可以让应用程序强制每个persistenceId使用currentPersistenceIdsPersistenceQuery:来编写新的快照

val readJournal =
PersistenceQuery(system).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier)
val everythingSnapshotted =
readJournal.currentPersistenceIds()
.mapAsync(parallelism) { id =>
// use the ask pattern to send a `ForceSnapshot` command and wait for reply
???
}
.runWith(Sink.ignore)

最新更新