我是Kafka和Kstreams的新手。我正在尝试将两个流连接起来,并将输出推送到第三个流。我已经尝试了几天各种各样的实现,现在却陷入了这个错误。错误,无法继续。有人能帮忙吗?
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.Duration;
import java.util.Properties;
@RestController
public class KafkaProcessingController {
private KafkaStreams streamsInnerJoin;
private Properties properties(){
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-stream-inner-join");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
return props;
}
private void streamsInnerJoinStart(StreamsBuilder builder){
if (streamsInnerJoin != null) {
streamsInnerJoin.close();
}
final Topology topology = builder.build();
streamsInnerJoin = new KafkaStreams(topology, properties());
streamsInnerJoin.start();
}
@RequestMapping("/startStreamStreamInnerJoin2/")
public void startStreamStreamInnerJoin2() {
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Item> leftSource = builder.stream("my-kafka-left-stream-topic"
, Consumed.with(Serdes.String(), new SchoolSerde() ) );
KStream<String, Item> rightSource = builder.stream("my-kafka-right-stream-topic"
, Consumed.with(Serdes.String(), new SchoolSerde() ));
KStream<String, Item> joined= leftSource
.selectKey((key, value) -> value.getName() )
.join( rightSource
.selectKey((key, value) -> value.getName())
,(value1, value2) -> {
System.out.println("value2.getName() >> "+value1.getName()+ value2.getName());
return value2;}
,JoinWindows.of(Duration.ofMinutes(5))
,Joined.with(
Serdes.String(),
new SchoolSerde(),
new SchoolSerde()
)
);
joined.to("my-kafka-stream-stream-inner-join-out");
streamsInnerJoinStart(builder);
}
public class SchoolSerde extends Serdes.WrapperSerde<Item> {
public SchoolSerde () {
super(new JsonSerializer<>(), new JsonDeserializer<>(Item.class));
}
}
}
这就是我在卡夫卡主题中的内容,在这两个主题上都是一样的
CreateTime:1588414271850 1 {"id":1,"name":"nuwan","category":"home"}
此项.java
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@JsonIgnoreProperties(ignoreUnknown = true)
public class Item {
private int id;
private String name;
private String category;
public int getId() {
return id;
}
@JsonCreator
public Item(@JsonProperty("id") int id, @JsonProperty("name") String name, @JsonProperty("category") String category) {
this.id = id;
this.name = name;
this.category = category;
}
@Override
public String toString() {
return "Item{" +
"id=" + id +
", name='" + name + ''' +
", category='" + category + ''' +
'}';
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
}
最终错误
Exception in thread "stream-stream-inner-join-0816e64b-5e97-4ca1-bb08-5976d3506e33-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_0, processor=KSTREAM-SOURCE-0000000006, topic=stream-stream-inner-join-KSTREAM-KEY-SELECT-0000000002-repartition, partition=0, offset=5, stacktrace=org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: ex4.Item). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
错误是由于为selectKey
操作引起的重新分区提供了不正确的Serdes
。
只是对于某些上下文,每当您更改密钥并执行联接或聚合时,Kafka Streams都会自动对数据进行重新分区,以确保更改后的密钥位于正确的分区上。
但是,据我所见,您为Joined
方法中的重新分区提供了正确的Serdes
。
Joined.with(Serdes.String(),
new SchoolSerde(),
new SchoolSerde())
但由于某种原因,改为使用来自配置的默认Serdes
。您可以尝试将此作为每个流的变通方法,而不是调试会话。
leftSource.selectKey((key, value) -> value.getName() )
.through("left-source-repartition",
Produced.with(Serdes.String(), new SchoolSerde())
.join(
rightSource.selectKey((key, value) -> value.getName() )
.through("right-source-repartition",
Produced.with(Serdes.String(), new SchoolSerde()),
.....
这个变通方法应该会让你重新开始。编辑:我忘了提一下,你需要为through
操作创建主题,新主题需要有
- 每个分区的数量相同
- 至少与源主题的分区数量相同,理想情况下是相同的数量
您使用的是什么版本的Kafka Streams?
让我知道进展如何。