Axon Partialy重播,我如何获得重播的startPosition的TrackingToken



我想要我的Axon重播事件,不是全部,而是部分。完全重播已经启动并运行,但当我想要部分重播时,我需要方法resetTokens()TrackingToken startPosition,我的问题是如何为部分重播获取此令牌?

我试过使用GapAwareTracingToken,但不起作用。

public void resetTokensWithRestartIndexFor(String trackingEventProcessorName, Long restartIndex) {
eventProcessingConfiguration
.eventProcessorByProcessingGroup(trackingEventProcessorName, TrackingEventProcessor.class)
.filter(trackingEventProcessor -> !trackingEventProcessor.isReplaying())
.ifPresent(trackingEventProcessor -> {
// shutdown this streaming processor
trackingEventProcessor.shutDown();

// reset the tokens to prepare the processor with start index for replay
trackingEventProcessor.resetTokens(GapAwareTrackingToken.newInstance(restartIndex - 1, Collections.emptySortedSet()));

// start the processor to initiate the replay
trackingEventProcessor.start();
});
}

当我使用GapAwareTrackingToken时,我会得到异常:

[] - Resolved [java.lang.IllegalArgumentException: Incompatible token type provided.]

我看到还有一个GlobalSequenceTrackingToken我可以使用,但我没有看到任何关于何时可以/应该使用这些的文档。

主"挑战";当进行部分重置时,你需要能够知道重置到哪里。在Axon中,流中的位置是用TrackingToken定义的。

您从中读取的源将为您提供这样一个令牌,其中包含它提供的每个事件。然而,当您进行重置时,您可能在消费这些事件时没有存储相关的令牌。

您也可以使用任何StreamableMessageSource创建令牌。通常,这是您的事件存储,但如果您从其他来源阅读,它也可能是其他内容。

StreamableMessageSource提供了4种创建令牌的方法:

  • createHeadToken-流最近边缘的位置,在该位置只读取新事件
  • createTailToken-流最开始的位置,允许您回放所有事件
  • createTokenAt(Instant)-流中最新的位置,它将返回在给定Instant上或之后创建的所有事件。请注意,某些事件的时间戳可能仍然早于此时间戳,因为不能保证事件创建和事件存储是相同的
  • createTokenSince(Duration)-类似于createTokenAt,但接受一定的时间返回

因此,在您的情况下,createTokenAt应该起作用。

相关内容

  • 没有找到相关文章

最新更新