我们将akka persistence 2.6.15中的EventSourcedBehavior用于CQRS/EventSourcing应用程序,并将akka persistence jdbc 4.0.0用于在PostgreSQL数据库中存储事件和快照。
我们有用快照序列化的状态类。但有时这些状态类会发生变化,这使得读取快照明显失败。我们通过删除那些更改的快照来管理它:
delete from snapshot sn
where sn.persistence_id::uuid in (select id from some_entity_table);
但对于具有大量事件的实体,在发送新命令时,需要花费大量时间才能获取最新的快照,从而导致超时。
是否可以在应用程序启动时强制重新生成快照?
可以说;真";解决方案是使用SerializerWithStringManifest
,它可以将以前的快照格式反序列化为当前格式。如果使用反射驱动的序列化,这可能会更加困难。它也不会迁移现有的快照。
您可以做的一个技巧是将显式ForceSnapshot
命令添加到持久参与者的协议中。这在经典API中非常容易,在那里您可以对快照进行更多的控制,所以我就不讨论了。然而,在EventSourcedBehavior
类型的API中,如果事件是SnapshotForced
,则添加SnapshotForced
事件并更改snapshotWhen
函数以返回true
需要更加精细。
无论是Classic还是Typed,您都可以让应用程序强制每个persistenceId使用currentPersistenceIds
PersistenceQuery:来编写新的快照
val readJournal =
PersistenceQuery(system).readJournalFor[JdbcReadJournal](JdbcReadJournal.Identifier)
val everythingSnapshotted =
readJournal.currentPersistenceIds()
.mapAsync(parallelism) { id =>
// use the ask pattern to send a `ForceSnapshot` command and wait for reply
???
}
.runWith(Sink.ignore)