Kafka Streams:使用Spring Cloud Stream为每组主题定义多个Kafka Streams.&l



我正在尝试用Kafka流做一个简单的POC。然而,我在启动应用程序时遇到了异常。我使用的是Spring- kafka, Kafka-Streams 2.5.1和Spring boot 2.3.5Kafka流配置

@Configuration
public class KafkaStreamsConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);
@Bean
public Function<KStream<String, String>, KStream<String, String>> processAAA() {
return input -> input.peek((key, value) -> log
.info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> processBBB() {
return input -> input.peek((key, value) -> log
.info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> processCCC() {
return input -> input.peek((key, value) -> log
.info("CCC Cloud Stream Kafka Stream processing : {}", input.toString().length()));
}
/*
@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
final Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupId-1"););
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
kafkaStreams.start();
return kafkaStreams;
}
@Bean
public Topology kafkaStreamTopology() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.stream(Arrays.asList(AAATOPIC, BBBInputTOPIC, CCCInputTOPIC));
return streamsBuilder.build();
} */
}

application.yaml配置如下所示。这个想法是,我有3个输入和3个输出主题。组件从input topic获取输入,并将输出输出给outputtopic。

spring:
application.name: consumerapp-1
cloud:
function:
definition: processAAA;processBBB;processCCC
stream:
kafka.binder: 
brokers: 127.0.0.1:9092
autoCreateTopics: true
auto-add-partitions: true
kafka.streams.binder:
configuration: 
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
processAAA-in-0:
destination: aaaInputTopic
processAAA-out-0:
destination: aaaOutputTopic
processBBB-in-0:
destination: bbbInputTopic
processBBB-out-0:
destination: bbbOutputTopic
processCCC-in-0:
destination: cccInputTopic
processCCC-out-0:
destination: cccOutputTopic

抛出的异常

Caused by: java.lang.IllegalArgumentException: Trying to prepareConsumerBinding public abstract void org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced)  but no delegate has been set.
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapperHandler.invoke(KStreamBoundElementFactory.java:134)

有谁能帮我用Kafka Streams Spring-Kafka代码样本处理多个输入和输出主题吗?

更新:21 - 1月- 2021

删除所有kafkaStreams和kafkaStreamsTopology bean配置后,我在无限循环中得到以下消息。消息消费仍然不能正常工作。我已经查看了申请中的订阅使用@Bean函数定义。它们看起来都没问题,但我还是会有交叉接线错误。我已经替换了应用程序。属性与应用程序。yaml上方

[consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
2021-01-21 14:12:43,336 WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription

我已经设法解决了这个问题。我写这篇文章是为了别人的利益。如果你想在一个应用程序jar中包含多个流,那么关键是定义多个应用程序id 每个流对应一个。我一直都知道这一点,但我不知道如何定义它。最后,答案是我在阅读SCSt文档后设法挖掘出来的东西。下面是如何应用程序。可以定义Yaml。application.yaml就像下面

spring:
application.name: kafkaMultiStreamConsumer
cloud:
function:
definition: processAAA; processBBB; processCCC --> // needed for Imperative @StreamListener
stream:
kafka: 
binder:
brokers: 127.0.0.1:9092
min-partition-count: 3
replication-factor: 2
transaction:
transaction-id-prefix: transaction-id-2000
autoCreateTopics: true
auto-add-partitions: true
streams:
binder:
functions: 
// needed for functional
processBBB: 
application-id: SampleBBBapplication
processAAA: 
application-id: SampleAAAapplication
processCCC: 
application-id: SampleCCCapplication
configuration: 
commit.interval.ms: 1000            
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde        
bindings:
// Below is for Imperative Style programming using 
// the annotation namely @StreamListener, @SendTo in .java class
inputAAA:
destination: aaaInputTopic
outputAAA:
destination: aaaOutputTopic
inputBBB:
destination: bbbInputTopic
outputBBB:
destination: bbbOutputTopic
inputCCC:
destination: cccInputTopic
outputCCC:
destination: cccOutputTopic
// Functional Style programming using Function<KStream...> use either one of them
// as both are not required. If you use both its ok but only one of them works
// from what i have seen @StreamListener is triggered always.
// Below is from functional style
processAAA-in-0:
destination: aaaInputTopic
group: processAAA-group
processAAA-out-0:
destination: aaaOutputTopic
group: processAAA-group
processBBB-in-0:
destination: bbbInputTopic
group: processBBB-group
processBBB-out-0:
destination: bbbOutputTopic
group: processBBB-group
processCCC-in-0:
destination: cccInputTopic
group: processCCC-group
processCCC-out-0:
destination: cccOutputTopic
group: processCCC-group

一旦定义了上面的,我们现在需要定义实现流处理逻辑的单个java类。您的Java类可以像下面这样。根据您的需求创建类似的其他2或N流。下面是一个示例:AAASampleStreamTask.java

@Component
@EnableBinding(AAASampleChannel.class) // One Channel interface corresponding to in-topic and out-topic
public class AAASampleStreamTask {
private static final Logger log = LoggerFactory.getLogger(AAASampleStreamTask.class);
@StreamListener(AAASampleChannel.INPUT)
@SendTo(AAASampleChannel.OUTPUT)
public KStream<String, String> processAAA(KStream<String, String> input) {
input.foreach((key, value) -> log.info("Annotation AAA *Sample* Cloud Stream Kafka Stream processing {}", String.valueOf(System.currentTimeMillis())));
...
// do other business logic
...
return input;
}

/**
* Use above or below. Below style is latest startting from ScSt 3.0 if iam not 
* wrong. 2 different styles of consuming Kafka Streams using SCSt. If we have 
* both then above gets priority as per my observation
*/     
/* 
@Bean
public Function<KStream<String, String>, KStream<String, String>> processAAA() {
return input -> input.peek((key, value) -> log.info(
"Functional AAA *Sample* Cloud Stream Kafka Stream processing : {}", String.valueOf(System.currentTimeMillis())));
...
// do other business logic
...
}
*/
}
如果您想使用命令式编程而不是函数式编程,则需要Channel。AAASampleChannel.java
public interface AAASampleChannel {
String INPUT = "inputAAA";
String OUTPUT = "outputAAA";
@Input(INPUT)
KStream<String, String> inputAAA();
@Output(OUTPUT)
KStream<String, String> outputAAA();
}

看起来你在应用程序中混合了Spring Cloud Stream和Spring Kafka。当使用绑定器时,你不需要直接定义Spring Kafka所需的组件,比如KafkaStreamsTopology,而是由SCSt隐式创建。你能移除以下豆子再试一次吗?

@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {

@Bean
public Topology kafkaStreamTopology() {

如果你仍然面临问题,请分享一个可以重复的小样本,这样我们就可以进一步分类。

最新更新