如何继续在具有轮询的单个线程上执行聚合



>我创建了一个流,它按状态轮询数据库中的行,验证它们,然后聚合到集合。处理完整个流程后,每行都设置为适当的状态。但是当我使用聚合器与发布策略TimeoutCountSequenceSizeReleaseStrategy,并且经过的时间如此之短时,发布组不会发生。之后,以下轮询发生在另一个线程中,但在消息数量未达到策略中的目标(阈值(之前,不会处理以前的消息组。

我的流代码:

@Bean
public IntegrationFlow testFlow(EntityService entityService,
                                EntityValidator entityValidator,
                                EntityFlowProperties properties,                                       
                                EntityChecker checker) {
    return IntegrationFlows
            .from(getMessageSource(entityService::getByStatus, properties.getMaxRowsPerPoll()),
                    e -> e.poller(getPollerSpec(properties)))
            .split()
            .transform(entityValidator::validate)
            .filter(ValidationStatus<Entity>::isValid, filter ->
                    filter.discardFlow(flow -> flow.handle(entityService::handleValidationErrors)))
            .transform(ValidationStatus<Entity>::getEntity)
            .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(5, 10000)))
            .transform(checker::checkOnSomething)
            .split()
            .transform(CheckResultAware<Entity>::getEntity)
            .handle(entityService::saveAndChangeStatus)
            .get();

我希望在与轮询相同的线程上执行聚合,并且在当前流结束之前不要进行新的轮询。

轮询和聚合之间更改状态的方式不合适。

有没有办法做到这一点?

为什么需要TimeoutCountSequenceSizeReleaseStrategy ; 你的序列是有限的;只需使用默认SimpleSequenceSizeReleaseStrategy

但是,无论如何,TimeoutCountSequenceSizeReleaseStrategy都应根据序列大小发布。

但是,它并不适合您的用例,因为您可能会在商店中留下部分组。

相关内容

  • 没有找到相关文章

最新更新