如何将带有显式状态存储的KTable KTable leftJoin从Kafka 0.11迁移到Kafka 2.0



由于API从Kafka 0.11更改为Kafka 2.0,我们面临一个问题。在我们基于0.11的Kafka流应用程序中,我们在两个使用命名状态存储的KTables[String,Something]之间进行了连接:

val joinedTable = {
myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table,new
MyJoiner, new MySerde[MyClass1Class2],"my-join-store")
}

然而,当迁移到2.0时,显式提供状态存储的唯一方法是:

val joinedTable = {
val materialized = Materialized.as[String,MyClass1,KeyValueStore[Bytes,Array[Byte]]]("join-store").withValueSerde(new Serde[MyClass1Class2])
myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table,new
MyJoiner,materialized)
}

使用此代码,在生产中替换应用程序实例失败,因为Kafka 0.11中的状态存储可能同时使用了myTable1和myTable2的密钥序列。

org.apache.kafka.streams.errors.StreamsException: A serializer (key: org.apache.kafka.common.serialization.ByteArraySerializer) is not compatible to the actual key type (key type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.state.StateSerdes.rawKey(StateSerdes.java:174)

除了执行kafka流应用程序重置外,还有其他更好的方法来处理这个问题吗?

您可以显式地传入密钥序列:

val joinedTable = {
val materialized = Materialized
.as[String,MyClass1,KeyValueStore[Bytes,Array[Byte]]]("join-store")
.withKeySerde(new StringSerde())
.withValueSerde(new Serde[MyClass1Class2])
myClass1Table.leftJoin[MyClass1,MyClass2](myClass2Table, new MyJoiner, materialized)
}

最新更新