只读一次日记



我正在使用akka persistence的PersistenceQuery将初始状态加载到管理内容的actor。我希望它在启动时只重播一次,但它一直将这些内容发送到日志中。

14:11:28.405 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:28.407 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:31.376 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:31.377 [rooms-akka.actor.default-dispatcher-17] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]
14:11:34.376 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - request replay for persistenceId [rooms] from [4] to [9223372036854775807] limit [100]
14:11:34.378 [rooms-akka.actor.default-dispatcher-4] DEBUG a.p.q.j.l.LiveEventsByPersistenceIdPublisher - replay completed for persistenceId [rooms], currSeqNo [4]

这是我为实现它而写的程序。

implicit val mat = ActorMaterializer()(context)
val queries = PersistenceQuery(context.system).readJournalFor[LeveldbReadJournal](
      LeveldbReadJournal.Identifier)
val src: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("rooms", 0L, Long.MaxValue)
val events: Source[Any, NotUsed] = src.map(_.event)
val future = events.runWith(Sink.foreach{
  case x: RoomCreated => process(x)
  case x: RoomDeleted => process(x)
  case x => logger.error(s"Could not spawn $x")
})

我认为你的预期行为和你实际看到的不同之处在于eventsByPersistenceId是一个"实时"流。这意味着它不仅会返回在您提供的偏移范围内开始的事件(您从0开始,然后转到Long.MaxValue,等等),还会在新事件出现时继续向您发送。如果您不想要实时流,请将调用改为currentEventsByPersistenceId。这将只包括到那个时间点(您提出请求的时间)的内容,而不是实时流。这应该是你想要的。

最新更新