Spring集成:使用Java DSL定义消息的顺序处理



对于我认为相当常见的集成流程,我无法找到解决方案:

  1. 从源文件
  2. 读取文件
  3. 过程文件
  4. 删除源文件如果处理成功。

目前,我有一个使用PublishSubscribeChannel的IntegrationFlow,其中两个IntegrationFlow作为订阅者:一个处理文件,一个删除文件。不幸的是,不管第一个(进程)的结果如何,后一个(删除)都会执行,即使是"进程"。flow抛出异常。

我需要的是一个顺序处理流程,但我不知道如何实现。创建了一些测试代码,但是没有工作,报告

2022-11-22 09:55:54.256 ERROR 14648 --- [   scheduling-1]
o.s.integration.handler.LoggingHandler   :
org.springframework.messaging.MessagingException: Failed to 
invoke method; nested exception is 
java.lang.IllegalArgumentException: wrong number of arguments

实验室代码:

@Configuration
@EnableIntegration
public class SeqChannels {
@Bean
public AtomicInteger integerSource() {
return new AtomicInteger();
}
@InboundChannelAdapter(channel = "process", poller = @Poller(fixedDelay = "1000"))
public Message<Integer> source(final AtomicInteger integerSource) {
return MessageBuilder.withPayload(integerSource.incrementAndGet()).build();
}
@ServiceActivator(inputChannel = "process", outputChannel = "delete")
public Integer process(@Payload Integer message) {
return message;
}
@ServiceActivator(inputChannel = "delete")
public void delete(@Payload Integer message) {
}
}

不能在@InboundChannelAdapter方法上设置参数。这工作…

@Configuration
@EnableIntegration
class SeqChannels {
AtomicInteger integerSource = new AtomicInteger();
@InboundChannelAdapter(channel = "process", poller = @Poller(fixedDelay = "1000"))
public Message<Integer> source() {
return MessageBuilder.withPayload(this.integerSource.incrementAndGet()).build();
}
@ServiceActivator(inputChannel = "process", outputChannel = "delete")
public Integer process(@Payload Integer message) {
System.out.println("Process: " + message);
return message;
}
@ServiceActivator(inputChannel = "delete")
public void delete(@Payload Integer message) {
System.out.println("delete: " + message);
}
}
Process: 1
delete: 1
Process: 2
delete: 2
Process: 3
...

最新更新