我有以下内容:
[inbound channel adapter] -> ... -> foo -> [outbound channel adapter] -> bar
我如何编写我的spring集成应用程序,以便foo
可以添加一个不属于[outbound channel adapter]
要使用的消息的额外对象,从而使bar
获得它?
我的应用程序基本上接收来自AWS SQS的消息(使用spring-integration-aws
(,进行一些过滤/转换,然后将消息发布到Apache Kafka(使用spring-integration-kafka
(,如果并且仅当成功,则从SQS队列中删除原始消息。
因此,当我接收到SQS消息时,我想保留receivehandle/acknowledgement对象,将消息的其余部分转换为要发布的Kafka消息,然后如果成功,则使用该receivehandler/acknowledge对象将原始消息出列。
假设我在spring-integration-kafka
文档中使用以下示例代码:
@Bean
@ServiceActivator(inputChannel = "toKafka", outputChannel = "result")
public MessageHandler handler() throws Exception {
KafkaProducerMessageHandler<String, String> handler =
new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setTopicExpression(new LiteralExpression("someTopic"));
handler.setMessageKeyExpression(new LiteralExpression("someKey"));
handler.setFailureChannel(failures());
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddress);
// set more properties
return new DefaultKafkaProducerFactory<>(props);
}
如上所述,如果我有一条消息message
和一些额外的、不相关的信息extra
,我该向toKafka
信道发送什么,以便handler
消耗message
,如果成功,result
信道将接收extra
?
出站通道适配器不产生输出-它们只是单向的,并结束流。
您可以将toKafka
设置为PublishSubscribeChannel
,并添加第二个服务激活器;默认情况下,只有在第一个成功的情况下才会调用第二个。