我想要我的Axon重播事件,不是全部,而是部分。完全重播已经启动并运行,但当我想要部分重播时,我需要方法resetTokens()
的TrackingToken startPosition
,我的问题是如何为部分重播获取此令牌?
我试过使用GapAwareTracingToken
,但不起作用。
public void resetTokensWithRestartIndexFor(String trackingEventProcessorName, Long restartIndex) {
eventProcessingConfiguration
.eventProcessorByProcessingGroup(trackingEventProcessorName, TrackingEventProcessor.class)
.filter(trackingEventProcessor -> !trackingEventProcessor.isReplaying())
.ifPresent(trackingEventProcessor -> {
// shutdown this streaming processor
trackingEventProcessor.shutDown();
// reset the tokens to prepare the processor with start index for replay
trackingEventProcessor.resetTokens(GapAwareTrackingToken.newInstance(restartIndex - 1, Collections.emptySortedSet()));
// start the processor to initiate the replay
trackingEventProcessor.start();
});
}
当我使用GapAwareTrackingToken
时,我会得到异常:
[] - Resolved [java.lang.IllegalArgumentException: Incompatible token type provided.]
我看到还有一个GlobalSequenceTrackingToken
我可以使用,但我没有看到任何关于何时可以/应该使用这些的文档。
主"挑战";当进行部分重置时,你需要能够知道重置到哪里。在Axon中,流中的位置是用TrackingToken定义的。
您从中读取的源将为您提供这样一个令牌,其中包含它提供的每个事件。然而,当您进行重置时,您可能在消费这些事件时没有存储相关的令牌。
您也可以使用任何StreamableMessageSource
创建令牌。通常,这是您的事件存储,但如果您从其他来源阅读,它也可能是其他内容。
StreamableMessageSource
提供了4种创建令牌的方法:
createHeadToken
-流最近边缘的位置,在该位置只读取新事件createTailToken
-流最开始的位置,允许您回放所有事件createTokenAt(Instant)
-流中最新的位置,它将返回在给定Instant
上或之后创建的所有事件。请注意,某些事件的时间戳可能仍然早于此时间戳,因为不能保证事件创建和事件存储是相同的createTokenSince(Duration)
-类似于createTokenAt
,但接受一定的时间返回
因此,在您的情况下,createTokenAt
应该起作用。