带有KTable Binder配置的Spring Cloud Stream-抛出java.lang.IollegalAr



需要将KTable与Spring Kafka绑定器配置一起使用,下面是用于从主题读取数据并将其打印在控制台上的示例代码。

但是应用程序以Throw java.lang.IollegalArgumentException终止:方法在应用程序启动时必须是声明性的

引用的Spring Cloud Stream Kafka-方法必须是声明的,但结果仍然是相同的

Java-11

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
</parent>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2020.0.2</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

活页夹:

public interface WordCountBinding {
@Input(value = " data-input-channel")
public KTable<String, String> readDataStream();
}

监听器

@Service
@EnableBinding(value = WordCountBinding.class)
public class WordCountListener {
@StreamListener(value = "data-input-channel")
public void listen(KTable<String, String> data) {
KStream<String, String> wordStream = data.filter((key,value) -> key.contains("SRS")).toStream();
wordStream.foreach((key, value) -> System.out.println("Key: " + key + " Value: " + value));
}
}

控制台输出:

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-03-19 21:26:53.598 ERROR 1912 --- [           main] o.s.boot.SpringApplication               : Application run failed
java.lang.IllegalArgumentException: Method must be declarative
at org.springframework.util.Assert.isTrue(Assert.java:121) ~[spring-core-5.3.5.jar:5.3.5]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:434) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:161) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:232) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:202) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:336) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:118) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:963) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
at com.shasr.streamwordcount.StreamWordcountApplication.main(StreamWordcountApplication.java:10) ~[classes/:na]
2021-03-19 21:26:53.613  INFO 1912 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Shutting down ExecutorService 'taskScheduler'
Process finished with exit code 1

您可以通过使用@Input声明方法参数来消除该特定异常,如下所示。

@StreamListener
public void listen(@Input("data-input-channel") KTable<String, String> data) {

更重要的是,建议不要使用StreamListener,因为它从SpringCloudStream的3.1.x版本开始就被弃用了。今后建议的方法是使用函数式。有关更多详细信息,请参阅参考文档。

相关内容

最新更新