如何在Spring Integration Flow中测试ApplicationEvent



在我的Spring项目(WebFlux/Kotlin Coroutines/Java 17(中,我定义了一个这样的bean。

@Bean
fun sftpInboundFlow(): IntegrationFlow {
return IntegrationFlows
.from(
Sftp.inboundAdapter(sftpSessionFactory())
.preserveTimestamp(true)
.deleteRemoteFiles(true) // delete files after transfer is done successfully
.remoteDirectory(sftpProperties.remoteDirectory)
.regexFilter(".*\.csv$")
// local settings
.localFilenameExpression("#this.toUpperCase() + '.csv'")
.autoCreateLocalDirectory(true)
.localDirectory(File("./sftp-inbound"))
) { e: SourcePollingChannelAdapterSpec ->
e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000))
}
/*            .handle { m: Message<*> ->
run {
val file = m.payload as File
log.debug("payload: ${file}")
applicationEventPublisher.publishEvent(ReceivedEvent(file))
}
}*/
.transform<File, DownloadedEvent> { DownloadedEvent(it) }
.handle(downloadedEventMessageHandler())
.get()
}
@Bean
fun downloadedEventMessageHandler(): ApplicationEventPublishingMessageHandler {
val handler = ApplicationEventPublishingMessageHandler()
handler.setPublishPayload(true)
return handler
}

并编写一个用于断言应用程序事件的测试。

@OptIn(ExperimentalCoroutinesApi::class)
@SpringBootTest(
classes = [SftpIntegrationFlowsTestWithEmbeddedSftpServer.TestConfig::class]
)
@TestPropertySource(
properties = [
"sftp.hostname=localhost",
"sftp.port=2222",
"sftp.user=user",
"sftp.privateKey=classpath:META-INF/keys/sftp_rsa",
"sftp.privateKeyPassphrase=password",
"sftp.remoteDirectory=${SftpTestUtils.sftpTestDataDir}",
"logging.level.org.springframework.integration.sftp=TRACE",
"logging.level.org.springframework.integration.file=TRACE",
"logging.level.com.jcraft.jsch=TRACE"
]
)
@RecordApplicationEvents
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class SftpIntegrationFlowsTestWithEmbeddedSftpServer {
companion object {
private val log = LoggerFactory.getLogger(SftpIntegrationFlowsTestWithEmbeddedSftpServer::class.java)
}
@Configuration
@Import(
value = [
SftpIntegrationFlows::class,
IntegrationConfig::class
]
)
@ImportAutoConfiguration(
value = [
IntegrationAutoConfiguration::class
]
)
@EnableConfigurationProperties(value = [SftpProperties::class])
class TestConfig {
@Bean
fun embeddedSftpServer(sftpProperties: SftpProperties): EmbeddedSftpServer {
val sftpServer = EmbeddedSftpServer()
sftpServer.setPort(sftpProperties.port ?: 22)
//sftpServer.setHomeFolder()
return sftpServer
}
@Bean
fun remoteFileTemplate(sessionFactory: SessionFactory<LsEntry>) = RemoteFileTemplate(sessionFactory)
}
@Autowired
lateinit var uploadGateway: UploadGateway
@Autowired
lateinit var embeddedSftpServer: EmbeddedSftpServer
@Autowired
lateinit var template: RemoteFileTemplate<LsEntry>
@Autowired
lateinit var applicationEvents: ApplicationEvents
@BeforeAll
fun setup() {
embeddedSftpServer.start()
}
@AfterAll
fun teardown() {
embeddedSftpServer.stop()
}
@Test
//@Disabled("application events can not be tracked in this integration tests")
fun `download the processed ach batch files to local directory`() = runTest {
val testFilename = "foo.csv"
SftpTestUtils.createTestFiles(template, testFilename)
eventually(10.seconds) {
// applicationEvents.stream().forEach{ log.debug("published event:$it")}
applicationEvents.stream(DownloadedEvent::class.java).count() shouldBe 1
SftpTestUtils.fileExists(template, testFilename) shouldBe false
SftpTestUtils.cleanUp(template)
}
}
}

它无法通过ApplicationEvents捕获应用程序事件。

我试图用构造函数自动连接的ApplicationEventPublisher替换ApplicationEventPublishingMessageHandler,但它也不能按预期工作。

检查完整的测试源代码:SftpIntegrationFlowsTestWithEmbeddedSftpServer

更新:applicationEvents在异步线程中不工作,无论是在侦听器方法上应用@Async,还是在异步线程调用applicationEvents,应用程序事件记录都没有按预期工作。

我不熟悉@RecordApplicationEvents,所以我会在支持@Configuration中注册一个@EventListener(File payload),并使用一些异步屏障来等待来自该计划任务的事件。

您可以打开org.springframework.integration的调试日志和消息历史记录,以在日志中查看消息的传播方式。根据您的SFTP状态,如果有一个。

最新更新