需要将KTable与Spring Kafka绑定器配置一起使用,下面是用于从主题读取数据并将其打印在控制台上的示例代码。
但是应用程序以Throw java.lang.IollegalArgumentException终止:方法在应用程序启动时必须是声明性的。
引用的Spring Cloud Stream Kafka-方法必须是声明的,但结果仍然是相同的
Java-11
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
</parent>
<properties>
<java.version>11</java.version>
<spring-cloud.version>2020.0.2</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
活页夹:
public interface WordCountBinding {
@Input(value = " data-input-channel")
public KTable<String, String> readDataStream();
}
监听器
@Service
@EnableBinding(value = WordCountBinding.class)
public class WordCountListener {
@StreamListener(value = "data-input-channel")
public void listen(KTable<String, String> data) {
KStream<String, String> wordStream = data.filter((key,value) -> key.contains("SRS")).toStream();
wordStream.foreach((key, value) -> System.out.println("Key: " + key + " Value: " + value));
}
}
控制台输出:
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-03-19 21:26:53.598 ERROR 1912 --- [ main] o.s.boot.SpringApplication : Application run failed
java.lang.IllegalArgumentException: Method must be declarative
at org.springframework.util.Assert.isTrue(Assert.java:121) ~[spring-core-5.3.5.jar:5.3.5]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:434) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:161) ~[spring-cloud-stream-binder-kafka-streams-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:232) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:202) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:336) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:118) ~[spring-cloud-stream-3.1.2.jar:3.1.2]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:963) ~[spring-beans-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583) ~[spring-context-5.3.5.jar:5.3.5]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:769) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:761) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:426) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:326) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1313) ~[spring-boot-2.4.4.jar:2.4.4]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1302) ~[spring-boot-2.4.4.jar:2.4.4]
at com.shasr.streamwordcount.StreamWordcountApplication.main(StreamWordcountApplication.java:10) ~[classes/:na]
2021-03-19 21:26:53.613 INFO 1912 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Shutting down ExecutorService 'taskScheduler'
Process finished with exit code 1
您可以通过使用@Input
声明方法参数来消除该特定异常,如下所示。
@StreamListener
public void listen(@Input("data-input-channel") KTable<String, String> data) {
更重要的是,建议不要使用StreamListener
,因为它从SpringCloudStream的3.1.x版本开始就被弃用了。今后建议的方法是使用函数式。有关更多详细信息,请参阅参考文档。