Spring Cloud Stream Kafka Binder中的 1输出绑定



在这个页面告诉你不能只做一个输出

,但我需要通过使用Spring Cloud Stream Kafka Binder只做一个输出

我该怎么办?

一些文章说使用org.springframework.cloud.stream.function.StreamBridge但是它不适合我

我让StreamBridge向Kafka发送主题,但是Kafka不为我的Spring启动应用程序生成主题

,这是我的应用程序。生成主题代码

// producer Springboot application  

spring.cloud.stream:
kafka:
binder:
brokers: {AWS.IP}:9092
zkNodes: {AWS.IP}:2181
bindings:
deliveryIncoming:
destination : deliveryIncomingtopic
@Bean
public CommandLineRunner commandLineRunner(ApplicationContext ctx) {
// wanna produce deliveryIncomingtopic and send to Spring's Consumer
}

// Consumer Springboot application
spring : 
cloud:
stream:
kafka:
binder: 
brokers: {AWS.IP}:9092
zkNodes: {AWS.IP}:2181
function:
definition : deliveryIncoming;
bindings:
deliveryIncoming-in-0:
destination : deliveryIncomingtopic
@Bean
public Consumer<KStream<String, String>> deliveryIncoming() { 
return input ->
input.foreach((key, value) -> {
System.out.println("deliveryIncoming is playing");
System.out.println("Key: " + key + " Value: " + value);
}); 
}
<标题>

编辑对不起,我想我写得有点不清楚

我只是想在

生产(deliveryIncomingtopic)→卡夫卡→消费者(deliveryIncomingtopic)

如果是这种情况,那么您需要更改bean函数定义以返回java.util.Function而不是java.util.Consumer

@Bean
public Function<KStream<String, String>, KStream<String, String>> deliveryIncoming() {
return input ->
input.foreach((key, value) -> {
System.out.println("deliveryIncoming is playing");
System.out.println("Key: " + key + " Value: " + value);
}); 
}

然而,AFAIK . .您仍然需要在application.yml中定义输出通道。。您可以使用不同后缀的相同名称。如下所示:

deliveryIncoming-in-0:
destination: <your_topic_name>
deliveryIncoming-out-0:
destination: <your_topic_name>

我想在这里把事情说清楚。您是否希望从入站主题deliveryIncomingtopic中消费消息,然后生成/生成另一个消息到另一个输出主题?如果这是你的问题,那么我相信你在你的application.yml中遗漏了一些东西。您需要为输出主题配置另一个配置。例如:

既然你正在使用Spring Cloud Function(基于我在application.yml中看到的)),那么您应该为您的输出主题添加更多配置,如下所示:

spring : 
cloud:
stream:
kafka:
binder: 
brokers: {AWS.IP}:9092
zkNodes: {AWS.IP}:2181
function:
definition: deliveryIncoming;deliveryOutput
bindings:
deliveryIncoming-in-0:
destination: deliveryIncomingtopic
deliveryOutput-out-0:
destination: deliveryOutput

并为您的生产者函数定义另一个bean:

@Bean
public Producer<KStream<String, String>> deliveryOutput() { 
// do your necessary logic here to outbound your message
}

希望能达到你的期望。

最新更新