如果这是一个微不足道的问题,请耐心等待。我用谷歌搜索过了,还没有找到答案。
我正在学习Spring事件驱动开发的教程。此时,我们的目标是拥有一个事件驱动实现的不可知论/可移植实现。
为此,我有2个Spring Boot项目,一个生产/发布主题,另一个消费主题。
这两个项目都是为了管理RabbitMQ和Kafka而创建的,不需要修改代码(或者这是意图)。
在RabbitMQ中一切都很好,但在Kafka中却不行。问题似乎是Kafka生产者在主题中添加了项目名称作为前缀,而消费者不知道这个前缀。
配置:
Producer项目应用程序。Yml(仅相关部分)
spring.cloud.stream:
bindings:
output-products:
destination: products
producer:
required-groups: auditGroup
---
spring.config.activate.on-profile: kafka
spring.cloud.stream.kafka.binder.brokers: kafka
management.health.rabbit.enabled: false
spring.cloud.stream.defaultBinder: kafka
spring.zipkin.sender.type: kafka
spring.kafka.bootstrap-servers: kafka:9092
消费者项目应用程序。Yml(仅相关部分)
spring.cloud.stream.bindings.input:
destination: products
group: productsGroup
---
spring.config.activate.on-profile: kafka
spring.cloud.stream.kafka.binder.brokers: kafka
management.health.rabbit.enabled: false
spring.cloud.stream.defaultBinder: kafka
spring.zipkin.sender.type: kafka
spring.kafka.bootstrap-servers: kafka:9092
对于生产者类,我创建了一个接口并声明为bean
public interface MessageSources {
String OUTPUT_PRODUCTS = "output-products";
String OUTPUT_RECOMMENDATIONS = "output-recommendations";
String OUTPUT_REVIEWS = "output-reviews";
@Output(OUTPUT_PRODUCTS)
MessageChannel outputProducts();
@Output(OUTPUT_RECOMMENDATIONS)
MessageChannel outputRecommendations();
@Output(OUTPUT_REVIEWS)
MessageChannel outputReviews();
}
然后,我使用这个bean发布主题
@EnableBinding(ProductCompositeIntegration.MessageSources.class)
@Component
public class ProductCompositeIntegration {
public Product createProduct(Product body) {
messageSources.outputProducts().send(MessageBuilder.withPayload(new Event(CREATE, body.getProductId(), body)).build());
return body;
}
对于消费者类绑定,我使用Sink.class(记住,我想要一个可移植的解决方案)
@EnableBinding(Sink.class)
public class MessageProcessor {
private static final Logger log = LoggerFactory
.getLogger(MessageProcessor.class);
private final ProductService productService;
@Autowired
public MessageProcessor(ProductService productService) {
this.productService = productService;
}
@StreamListener(target = Sink.INPUT)
public void process(Event<Integer, Product> event) {
log.info("Process message created at {}...", event.getEventCreatedAt());
switch (event.getEventType()) {
....
一切就绪,并为RabbitMQ配置,这工作得很好。但是当我尝试使用Kafka时,我得到错误:
Dispatcher has no subscribers for channel 'product-composite-1.output-products'
其中product-composite
为生产者项目的名称。
作为参考,下面是自动创建的主题列表
bash-4.4# kafka-topics.sh --zookeeper zookeeper:2181 --list
__consumer_offsets
error.products.productsGroup
error.recommendations.recommendationsGroup
error.reviews.reviewsGroup
products
recommendations
reviews
zipkin
所以,kafka库在自动配置激活的情况下,似乎无法连接到主题:
spring.cloud.stream.bindings.<messagechannel>.destination: <topicname>
升级库版本后问题解决了:
不工作
plugins {
id 'org.springframework.boot' version '2.4.0'
id 'io.spring.dependency-management' version '1.0.10.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/milestone' }
}
ext {
set('springCloudVersion', "2020.0.0-M5")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
test {
useJUnitPlatform()
}
工作plugins {
id 'org.springframework.boot' version '2.4.2'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
mavenCentral()
maven { url 'https://repo.spring.io/milestone' }
}
ext {
set('springCloudVersion', "2020.0.0")
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.cloud:spring-cloud-stream-binder-rabbit'
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.amqp:spring-rabbit-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
test {
useJUnitPlatform()
}