kafka streams-一般性魔术字节在genericavroserde上



尝试使用kafka流传输avro数据时,我遇到了这个错误:

Exception in thread "StreamThread-1" org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

即使我在邮件列表中发现了几个较旧的线程,但没有解决问题的解决方案解决了问题。所以希望我可以在这里找到一个解决方案。

我的设置看起来如下:

StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName
StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde]   
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, localhost:8081)  

我已经尝试将KEY_SERDE设置为与VALUE_SERDE相同,但是即使它被"标记"为邮件列表中的解决方案,但在我的情况下它也不起作用。

我正在使用我的架构生成GenericData.Record如下:

val record = new GenericData.Record(schema)
...
record.put(field, value)

当我启动调试模式并检查生成的记录时,一切看起来都不错,记录中有数据,映射是正确的。

我像这样流式传输kStream(我以前使用过分支):

splitTopics.get(0).to(s"${destTopic}_Testing")

我正在使用GenericData.Record进行记录。与GenericAvroSerde结合使用这可能是一个问题?

解决我问题的解决方案是在我从输入主题中获得的字符串值后交换VALUE_SERDE

由于 Serde是序列化和挑选化的组合"元素",所以我不能简单地使用 StreamsConfig.VALUE_SERDE_CLASS_CONFIG, classOf[GenericAvroSerde],而必须使用 StringSerde进行反序列化输入记录,然后使用 AvroSerde将其写出输出主题。




现在看起来像这样:

// default streams configuration serdes are different from the actual output configurations
val streamsConfiguration: Properties = {
  val p = new Properties()
  p.put(StreamsConfig.APPLICATION_ID_CONFIG, kStreamsConf.getString("APPLICATION_ID"))
  p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kStreamsConf.getString("BOOTSTRAP_SERVERS_CONFIG"))
  p.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kStreamsConf.getString("AUTO_OFFSET_RESET_CONFIG"))
  p.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName)
  p.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kStreamsConf.getString("SCHEMA_REGISTRY_URL_CONFIG"))
  p
}
// adjusted output serdes for avro records
val keySerde: Serde[String] = Serdes.String
val valSerde: Serde[GenericData.Record] = new GenericAvroSerde()
valSerde.configure(
  Collections.singletonMap(
    AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
    streamsConfiguration.get(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG)
  ),
  /* isKeySerde = */ false
)
// Now using the adjusted serdes to write to output like this
stream.to(keySerde, valSerde, "destTopic")

以这种方式,它像魅力一样工作。
谢谢

相关内容

  • 没有找到相关文章

最新更新