使用flink发送多个主题的卡夫卡接收器



我有一个数据流,如下所示:

DataStream[myTuple(topic, value)]

我想在相关主题中发送一个特定的值。

所以我尝试做这样的事情:

new FlinkKafkaProducer010[myTuple](
"default_topic",
new KeyedSerializationSchema[myTuple](){
override def getTargetTopic(element: myTuple): String = element.topic
override def serializeKey(element: myTuple): Array[Byte] = null
override def serializeValue(element: myTuple): Array[Byte] = new SimpleStringSchema().serialize(element.value)
},
properties)

但它不起作用,我有这个警告:

WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - Overwriting the 'key.serializer' is not recommended
WARN  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase  - Overwriting the 'value.serializer' is not recommended

我不知道该怎么做,换句话说。 感谢您的帮助。

您可能在属性中设置了key.serializervalue.serializer。你不应该这样做,因为这样你就会覆盖 Flink 内部使用的序列化程序 (ByteArraySerializers(。删除这些属性,您的代码应该可以工作。

相关内容

  • 没有找到相关文章

最新更新