如何在Spring Integration Reactor中创建反应式入站通道适配器



我想了解如何创建一个反应式通道适配器,用于与Reactor核心的Spring集成。我从我读过的其他论坛中了解到,这个Mongo-DB反应式适配器可以是一个很好的例子,但它包含许多Mongo域特定的类。

我已经阅读了文档的Reactive部分,我发现有必要实现MessageProducerSupport,但从代码示例来看,似乎有必要实现一个扩展MessageProducerSpec并调用第一个的类。有人能举一个最基本用法的例子,并解释创建这样一个通道适配器的真正需求是什么吗?我知道我应该做的是:

public IntegrationFlow buildPipe() {
return IntegrationFlows.from(myMessageProducerSpec)
.handle(reactiveMongoDbStoringMessageHandler, "handleMessage")
.handle(writeToKafka)
.get();
}

MessageProducerSpec适用于Java DSL。它和通道适配器的低级逻辑无关。如果你有一个MessageProducerSupport,那么这个就足够好了,可以在流定义中使用:

/**
* Populate the provided {@link MessageProducerSupport} object to the {@link IntegrationFlowBuilder} chain.
* The {@link org.springframework.integration.dsl.IntegrationFlow} {@code startMessageProducer}.
* @param messageProducer the {@link MessageProducerSupport} to populate.
* @return new {@link IntegrationFlowBuilder}.
*/
public static IntegrationFlowBuilder from(MessageProducerSupport messageProducer) {

请参阅有关Java DSL中任意通道适配器使用的文档中的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl协议适配器

但还是要说一次:忘记您的通道适配器的Java DSL API。首先实现该通道适配器。是的,反应式MessageProducerSupport必须在其doStart()实现中使用subscribeToPublisher()。从源系统构建Flux的其余逻辑取决于您和您将要依赖的库

还有一个ReactiveRedisStreamMessageProducerZeroMqMessageProducer,但我不能说它们的代码比上面提到的MongoDbChangeStreamMessageProducer更容易消化。

相关内容

最新更新