已请求默认活页夹,但没有可用于'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel'的活页夹



我正在尝试使用Spring Cloud + Kafka Streams + Spring Boot 2创建最简单的hello world。

我意识到我错过了基本概念。基本上,我理解:

1 - 我需要定义一个出站流来向 Kafka 主题写入消息,以及一个入站流来读取来自 Kafka 主题的消息

public interface LoansStreams {
String INPUT = "loans-in";
String OUTPUT = "loans-out";
@Input(INPUT)
SubscribableChannel inboundLoans();
@Output(OUTPUT)
MessageChannel outboundLoans();
}

2 - 配置 Spring Cloud 流以绑定到我的流

@EnableBinding(LoansStreams.class)
public class StreamsConfig {
}

3 - 配置 Kafka 属性

spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
loans-in:
destination: loans
contentType: application/json
loans-out:
destination: loans
contentType: application/json

4 - 为交换消息创建模型

@Getter @Setter @ToString @Builder
public class Loans {
private long timestamp;
private String result;
}

5 - 写信给卡夫卡

@Service
@Slf4j
public class LoansService {
private final LoansStreams loansStreams;
public LoansService(LoansStreams loansStreams) {
this.loansStreams = loansStreams;
}
public void sendLoan(final Loans loans) {
log.info("Sending loans {}", loans);
MessageChannel messageChannel = loansStreams.outboundLoans();
messageChannel.send(MessageBuilder
.withPayload(loans)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
}

6 - 听卡夫卡主题

@Component
@Slf4j
public class LoansListener {
@StreamListener(LoansStreams.INPUT)
public void handleLoans(@Payload Loans loans) {
log.info("Received results: {}", loans);
}
}

我花了一整天的时间阅读了几篇博客,我认为上面的代码至少是可行的。我不确定我是否真的尽可能编码最好的 aproach。顺便说一句,我收到主题中提到的错误:

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 18:33:05.619 ERROR 14784 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set.

在谷歌上搜索解决方案时,我发现有人说要编写 StreamListe 返回模型,所以我将其替换为:

@StreamListener(LoansStreams.INPUT)
@SendTo("loans-out")
public KStream<?, Loans> process(KStream<?, Loans> l) {
log.info("Received: {}", l);
return l;
}

然后我得到一个至少对我来说更不清楚的错误(以前的错误清楚地提到了一些活页夹问题):

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 19:01:06.016 ERROR 13276 --- [  restartedMain] o.s.boot.SpringApplication               : Application run failed
java.lang.IllegalArgumentException: Method must be declarative
at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:510) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:168) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:226) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]

如果它以某种方式有所帮助,我想发展这个想法来应用 SAGAS,但这不是这个问题的重点。首先,我需要启动并运行基本内容。

*编辑

绒球.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.mybank</groupId>
<artifactId>kafka-cloud-stream</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-cloud-stream</name>
<description>Spring Cloud Stream With Kafka</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<!-- version>5.1.5.RELEASE</version-->
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
">

已请求默认活页夹,但没有可用的活页夹...",请添加依赖项,如下所示。

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

您可以在 application.yml(或 application.properties)中定义默认绑定器

spring:
cloud:
stream:
bindings:
...
default-binder: kafka

对我来说,对于不同的上下文和多个输出绑定有不同的application.properties,我唯一可以修复它的方法是定义一个通用的默认绑定,例如:

spring:
cloud:
stream:
default-binder: eventhub
...

其余的绑定类型也在每个输入/输出中单独设置。

在上面的pom文件中,你需要使用binder-kafka而不是binder-kafka-streams

所以更换

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

相关内容

最新更新