Spring集成测试a文件.inboundAdapter流



我有这个流,我试图测试,但没有工作如预期。流程本身运行良好,但测试似乎有点棘手。这是我的流程:

@Configuration
@RequiredArgsConstructor
public class FileInboundFlow {
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
private String filePath;

@Bean
public IntegrationFlow fileReaderFlow() {

return IntegrationFlows.from(Files.inboundAdapter(new File(this.filePath))
.filterFunction(...)
.preventDuplicates(false),
endpointConfigurer -> endpointConfigurer.poller(
Pollers.fixedDelay(500)
.taskExecutor(this.threadPoolTaskExecutor)
.maxMessagesPerPoll(15)))
.transform(new UnZipTransformer())
.enrichHeaders(this::headersEnricher)
.transform(Message.class, this::modifyMessagePayload)
.route(Map.class, this::channelsRouter)
.get();
}
private String channelsRouter(Map<String, File> payload) {
boolean isZip = payload.values()
.stream()
.anyMatch(file -> isZipFile(file));
return isZip ? ZIP_CHANNEL : XML_CHANNEL; // ZIP_CHANNEL and XML_CHANNEL are PublishSubscribeChannel
}

@Bean
public SubscribableChannel xmlChannel() {
var channel = new PublishSubscribeChannel(this.threadPoolTaskExecutor);
channel.setBeanName(XML_CHANNEL);
return channel;
}

@Bean
public SubscribableChannel zipChannel() {
var channel = new PublishSubscribeChannel(this.threadPoolTaskExecutor);
channel.setBeanName(ZIP_CHANNEL);
return channel;
}
//There is a @ServiceActivator on each channel
@ServiceActivator(inputChannel = XML_CHANNEL)
public void handleXml(Message<Map<String, File>> message) {
...
}
@ServiceActivator(inputChannel = ZIP_CHANNEL)
public void handleZip(Message<Map<String, File>> message) {
...
}
//Plus an @Transformer on the XML_CHANNEL
@Transformer(inputChannel = XML_CHANNEL, outputChannel = BUS_CHANNEL)
private List<BusData> xmlFileToIngestionMessagePayload(Map<String, File> xmlFilesByName) {
return xmlFilesByName.values()
.stream()
.map(...)
.collect(Collectors.toList());
}
}

我想测试多个案例,第一个是检查fileReaderFlow结束后在每个通道上发布的消息有效负载。所以我定义了这个测试类:

@SpringBootTest
@SpringIntegrationTest
@ExtendWith(SpringExtension.class)
class FileInboundFlowTest {

@Autowired
private MockIntegrationContext mockIntegrationContext;
@TempDir
static Path localWorkDir;
@BeforeEach
void setUp() {
copyFileToTheFlowDir(); // here I copy a file to trigger the flow
}

@Test
void checkXmlChannelPayloadTest() throws InterruptedException {

Thread.sleep(1000); //waiting for the flow execution
PublishSubscribeChannel xmlChannel = this.getBean(XML_CHANNEL, PublishSubscribeChannel.class); // I extract the channel to listen to the message sent to it.
xmlChannel.subscribe(message -> {
assertThat(message.getPayload()).isInstanceOf(Map.class); // This is never executed
});
}
}

正如预期的那样,测试不工作,因为assertThat(message.getPayload()).isInstanceOf(Map.class);从未执行过。在阅读文档后,我没有找到任何提示来帮助我解决这个问题。任何帮助将不胜感激!谢谢你。

首先,channel.setBeanName(XML_CHANNEL);不会影响目标bean。您在bean创建阶段这样做,依赖注入容器对这个设置一无所知:它只是不咨询它。如果您真的想为bean指定一个XML_CHANNEL名称,您最好查看@Bean(name)属性。

测试中的问题是您忽略了流的异步逻辑的事实。如果完全不同的线程,Files.inboundAdapter()可以工作,并在测试方法之外发出消息。因此,即使您可以在向通道发送任何消息之前及时订阅该通道,这并不意味着您的测试将正确工作:assertThat()将在不同的线程上执行。因此,您的测试方法上下文没有真正的JUnit报告。

所以,我的建议是:

  1. Files.inboundAdapter()在测试开始时停止,然后再进行任何设置。或者至少不要将文件放入filePath,这样通道适配器就不会发出消息。
  2. 从应用程序上下文中获取通道,如果您希望订阅或使用ChannelInterceptor
  3. 设置异步屏障,例如CountDownLatch传递给该用户。
  4. 启动通道适配器或将文件放入目录进行扫描。
  5. 等待异步屏障,然后再验证一些值或状态。

最新更新