如何通过出站通道适配器传递与消息关联的对象



我有以下内容:

[inbound channel adapter] -> ... -> foo -> [outbound channel adapter] -> bar

我如何编写我的spring集成应用程序,以便foo可以添加一个不属于[outbound channel adapter]要使用的消息的额外对象,从而使bar获得它?

我的应用程序基本上接收来自AWS SQS的消息(使用spring-integration-aws(,进行一些过滤/转换,然后将消息发布到Apache Kafka(使用spring-integration-kafka(,如果并且仅当成功,则从SQS队列中删除原始消息。

因此,当我接收到SQS消息时,我想保留receivehandle/acknowledgement对象,将消息的其余部分转换为要发布的Kafka消息,然后如果成功,则使用该receivehandler/acknowledge对象将原始消息出列。

假设我在spring-integration-kafka文档中使用以下示例代码:

@Bean
@ServiceActivator(inputChannel = "toKafka", outputChannel = "result")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
handler.setFailureChannel(failures());
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}

如上所述,如果我有一条消息message和一些额外的、不相关的信息extra,我该向toKafka信道发送什么,以便handler消耗message,如果成功,result信道将接收extra

出站通道适配器不产生输出-它们只是单向的,并结束流。

您可以将toKafka设置为PublishSubscribeChannel,并添加第二个服务激活器;默认情况下,只有在第一个成功的情况下才会调用第二个。

最新更新