你好,我在测试spring-cloud&kafka流并出现错误。
错误日志:
org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.binder.DefaultPollableMessageSource' : , and no default binder has been set.
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at com.learn.SpringKafkaApplication.main(SpringKafkaApplication.java:30) ~[classes/:na]
Caused by: java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.binder.DefaultPollableMessageSource' : , and no default binder has been set.
at org.springframework.cloud.stream.binder.DefaultBinderFactory.doGetBinder(DefaultBinderFactory.java:183) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.cloud.stream.binder.DefaultBinderFactory.getBinder(DefaultBinderFactory.java:134) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.getBinder(BindingService.java:362) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:95) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:112) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]
at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:57) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) ~[spring-cloud-stream-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.2.3.RELEASE.jar:5.2.3.RELEASE]
... 14 common frames omitted
这是我的代码:
1
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.binder.PollableMessageSource;
public interface SynSink {
@Input("input")
PollableMessageSource source();
}
2
import com.learn.bind.SynSink;
import com.learn.model.Employee;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binder.PollableMessageSource;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@SpringBootApplication(scanBasePackages = "com.learn.*")
@EnableBinding({Source.class, SynSink.class})
@EnableScheduling
public class SpringKafkaApplication {
@Autowired
private PollableMessageSource messageSource;
public static void main(String[] args) {
SpringApplication.run(SpringKafkaApplication.class, args);
}
@Scheduled(fixedRate = 5000)
public void getMessage() {
messageSource.poll(m -> System.out.print(m.getPayload()), new
ParameterizedTypeReference<Employee>() {
});
}
}
正如预期的那样,类路径中有多个绑定器:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
你可以删除卡夫卡流,如果你不使用它。
或者在你的应用程序中指定你的默认活页夹。properties:
spring.cloud.stream.default-binder=kafka
有关更多信息,请参阅spring cloud stream文档中的多个绑定器部分。