没有从句柄调用Spring集成流



我有以下2个组件,应该首先从Mongo删除一个文档,然后从Elastic删除。

主要流程:

@Component
public class DeleteDocumentFlow {
@Autowired
private StoreInMongoFlow storeInMongoFlow;
@Bean
public IntegrationFlow deleteDocument() {
return IntegrationFlows.from(Channels.METADATA_DELETE_STATUS.name())
.handle(storeInMongoFlow.deleteDocumentInMongo())
.channel("deleteDocumentInES.input")
.get();
}
}
服务:

@Component
public class StoreInMongoFlow {
@Bean
public IntegrationFlow deleteDocumentInMongo() {
return flow -> flow.
<Metadata>handle((p, h) -> {
DBObject obj = BasicDBObjectBuilder.start("i", p.getId()).get();
DeleteResult documentEntry = this.mongoTemplate.remove(obj, "docs");
return documentEntry.getDeletedCount();
})
.log(LoggingHandler.Level.INFO, m -> "Number of documents deleted: " + m.getPayload());
}
}

不幸的是,deleteDocumentInMongo永远不会被调用。正如我在日志中看到的那样,bean被正确地注册了。

我做了一些根本性的错误,或者你需要一些更多的调试信息?如果我窃听句柄,则deleteDocumentInES。输入被执行,但是mongo流被忽略。

你肯定做了一些根本性的错误。您尝试将IntegrationFlow视为从handle()调用的服务。这不是什么IntegrationFlow已经被设计。更多信息参见文档:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl

DSL提供了一个IntegrationFlow组件来定义通道和通道之间的端点的组合,但是现在IntegrationFlow只扮演配置角色,在应用程序上下文中填充真正的bean,而不是在运行时使用。

如果你声明一个逻辑作为一个单独的IntegrationFlow,你不需要担心handle()-只需使用channel("deleteDocumentInMongo.input")从主流点发送消息到MongoDB子流的第一个通道。

如果你想用Elastic做同样的操作,你应该考虑有一个PublishSubscribeChannel来发送消息和两个从这个通道开始的流。

因为你用log()结束deleteDocumentInMongo流,你不能得到任何回复,你的.channel("deleteDocumentInES.input")将不可达。

请阅读更多文档以了解什么是发布-订阅、请求-应答、服务激活器和流本身。

最新更新