java.lang.ClassCastException:类 .$Proxy 143 不能强制转换为类。消息通道 (...在加载器"应用程序"的未命名模块中)



我正在为Spring Cloud Stream应用程序编写测试。这有一个从主题 A 读取的 KStream。在测试中,我使用 KafkaTemplate 发布消息并等待 KStream 日志显示。

测试会引发以下异常:

java.lang.ClassCastException: class com.sun.proxy.$Proxy143 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy143 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')
at org.springframework.cloud.stream.test.binder.TestSupportBinder.bindConsumer(TestSupportBinder.java:66) ~[spring-cloud-stream-test-support-3.0.1.RELEASE.jar:3.0.1.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:169) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:115) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:112) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]

此异常不会显示在应用程序的正常执行中。

溪流:

@Configuration
class MyKStream() {
private val logger = LoggerFactory.getLogger(javaClass)
@Bean
fun processSomething(): Consumer<KStream<XX, XX>> {
return Consumer { something ->
something.foreach { key, value ->
logger.info("--------> Processing xxx key {} - value {}", key, value)
}
}

测试:

@TestInstance(PER_CLASS)
@EmbeddedKafka
@SpringBootTest(properties = [
"spring.profiles.active=local",
"schema-registry.user=",
"schema-registry.password=",
"spring.cloud.stream.bindings.processSomething-in-0.destination=topicA",
"spring.cloud.stream.bindings.processSomething-in-0.producer.useNativeEncoding=true",
"spring.cloud.stream.bindings.processSomethingElse-in-0.destination=topicB",
"spring.cloud.stream.bindings.processSomethingElse-in-0.producer.useNativeEncoding=true",
"spring.cloud.stream.kafka.streams.binder.configuration.application.server=localhost:8080",
"spring.cloud.stream.function.definition=processSomething;processSomethingElse"])
class MyKStreamTests {
private val logger = LoggerFactory.getLogger(javaClass)
@Autowired
private lateinit var embeddedKafka: EmbeddedKafkaBroker
@Autowired
private lateinit var schemaRegistryMock: SchemaRegistryMock
@AfterAll
fun afterAll() {
embeddedKafka.kafkaServers.forEach { it.shutdown() }
embeddedKafka.kafkaServers.forEach { it.awaitShutdown() }
}
@Test
fun `should send and process something`() {
val producer = createProducer()
logger.debug("**********----> presend")
val msg = MessageBuilder.withPayload(xxx)
.setHeader(KafkaHeaders.MESSAGE_KEY, xxx)
.setHeader(KafkaHeaders.TIMESTAMP, 1L)
.build()
producer.send(msg).get()
logger.debug("**********----> sent")
Thread.sleep(100000)
}
}
@Configuration
class KafkaTestConfiguration(private val embeddedKafkaBroker: EmbeddedKafkaBroker) {
private val schemaRegistryMock = SchemaRegistryMock()
@PostConstruct
fun init() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaBroker.brokersAsString)
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaBroker.brokersAsString)
schemaRegistryMock.start()
System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url", schemaRegistryMock.url)
}
@Bean
fun schemaRegistryMock(): SchemaRegistryMock {
return schemaRegistryMock
}
@PreDestroy
fun preDestroy() {
schemaRegistryMock.stop()
}
}

您可能正在使用spring-cloud-stream-test-support作为依赖项,并且此依赖项绕过了绑定程序 API 的某些核心功能,从而导致此错误。

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.3.RELEASE/reference/html/spring-cloud-stream.html#_testing

最新更新