无法将 Kafka 流反序列化为 pojo。找不到编写器架构中指定的类



从 kafka 主题读取时出现异常:由以下原因引起: org.apache.kafka.common.errors.SerializationException: Error 反序列化 id 1 的 Avro 消息 由以下原因引起: org.apache.kafka.common.errors.SerializationException: 找不到 在查找读取器的架构中指定的类 USERS,同时查找读取器的 特定记录的架构。

我认为反序列化是不正确的,我也找不到任何合适的例子。

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG , "TEST");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass().getName());
                    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
                    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
                    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
    StreamsBuilder builder = new StreamsBuilder();


    KStream<String,USERS> valid = builder.stream("testUSERS");
valid.foreach((k,v)-> System.out.println("v ="+v.getUSERNAME()));
valid.foreach((k,v)-> System.out.println("k ="+k));
KafkaStreams streams = new KafkaStreams(builder.build(),props);
streams.cleanUp();
streams.start();

您正在使用 kafka 的配置来要求特定模式,在这种情况下,使用的模式必须存在于您的类路径中,以便可以解析它,如果没有,请配置为使用 GenericRecord

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);

相关内容

  • 没有找到相关文章

最新更新