由于API从Kafka 0.11更改为Kafka 2.0,我们面临一个问题。在我们基于0.11的Kafka流应用程序中,我们在两个使用命名状态存储的KTables[String,Something]
之间进行了连接:
val joinedTable = {
myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table,new
MyJoiner, new MySerde[MyClass1Class2],"my-join-store")
}
然而,当迁移到2.0时,显式提供状态存储的唯一方法是:
val joinedTable = {
val materialized = Materialized.as[String,MyClass1,KeyValueStore[Bytes,Array[Byte]]]("join-store").withValueSerde(new Serde[MyClass1Class2])
myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table,new
MyJoiner,materialized)
}
使用此代码,在生产中替换应用程序实例失败,因为Kafka 0.11中的状态存储可能同时使用了myTable1和myTable2的密钥序列。
org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:174)
除了执行kafka流应用程序重置外,还有其他更好的方法来处理这个问题吗?
您可以显式地传入密钥序列:
val joinedTable = {
val materialized = Materialized
.as[String,MyClass1,KeyValueStore[Bytes,Array[Byte]]]("join-store")
.withKeySerde(new StringSerde())
.withValueSerde(new Serde[MyClass1Class2])
myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table, new MyJoiner, materialized)
}