Flink动态路由Kafka主题的解决方案是实现KeyedSerializationSchema并覆盖getTargetTopic,但是KeyedSerializationSchema已被弃用,应该使用KafkaSerializationSchema。此接口不提供 getTargetTopic 或类似内容。
那么,在 Flink 中,Kafka 动态路由应该如何工作,因为 getTargetTopic 已经不存在了?
KafkaSerializationSchema. serialize
返回ProducerRecord<byte[], byte[]>
这个ProducerRecord
有一个主题。 您可以使用像 https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/ProducerRecord.html#ProducerRecord-java.lang.String-K-V- 这样的构造函数 以注入主题。
考虑到这一点,您只需要创建一个类似
String dynamicTopic(T element, @Nullable Long timestamp)
您的KafkaSerializationSchema
实现只需要使用它
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp){
...
return new ProducerRecord(dynamicTopic(element, timestamp), aKey, aValue);
}