>我创建了一个流,它按状态轮询数据库中的行,验证它们,然后聚合到集合。处理完整个流程后,每行都设置为适当的状态。但是当我使用聚合器与发布策略TimeoutCountSequenceSizeReleaseStrategy
,并且经过的时间如此之短时,发布组不会发生。之后,以下轮询发生在另一个线程中,但在消息数量未达到策略中的目标(阈值(之前,不会处理以前的消息组。
我的流代码:
@Bean
public IntegrationFlow testFlow(EntityService entityService,
EntityValidator entityValidator,
EntityFlowProperties properties,
EntityChecker checker) {
return IntegrationFlows
.from(getMessageSource(entityService::getByStatus, properties.getMaxRowsPerPoll()),
e -> e.poller(getPollerSpec(properties)))
.split()
.transform(entityValidator::validate)
.filter(ValidationStatus<Entity>::isValid, filter ->
filter.discardFlow(flow -> flow.handle(entityService::handleValidationErrors)))
.transform(ValidationStatus<Entity>::getEntity)
.aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(5, 10000)))
.transform(checker::checkOnSomething)
.split()
.transform(CheckResultAware<Entity>::getEntity)
.handle(entityService::saveAndChangeStatus)
.get();
我希望在与轮询相同的线程上执行聚合,并且在当前流结束之前不要进行新的轮询。
在轮询和聚合之间更改状态的方式不合适。
有没有办法做到这一点?
为什么需要TimeoutCountSequenceSizeReleaseStrategy
; 你的序列是有限的;只需使用默认SimpleSequenceSizeReleaseStrategy
。
但是,无论如何,TimeoutCountSequenceSizeReleaseStrategy
都应根据序列大小发布。
但是,它并不适合您的用例,因为您可能会在商店中留下部分组。