thrift序列化和kafka流中的避难所化



我正在使用thrift来进行序列化和避难所,以改进性能,同时从kafka流传输字节数据

当我避免时,我会继续遇到此错误:

org.apache.thrift.protocol.tprotocolexception:未识别的类型123

我的代码

public void streamMessageByte() {   
    final StreamsBuilder builder = new StreamsBuilder();
    KStream<Integer, byte[]> stream = builder.stream(kafka_topic);
    deserializer = new TDeserializer();
    serializer = new TSerializer();
    //Thrift class pojo object is 'deser' which matches byte array data format
    stream.map((k,v){
        try{
            deserializer.deserialize(deser, v);
        }
        catch(TException e){
        }
    null;
});
   

我遇到了这个问题,当我使用不同的协议来序列化和进行序列化时。

序列化器是usind ObjectMapper,而deSerializer则与 TBinaryProtocol一起使用 TDeserializer。示例:

@Test
public void testSerDe() throws TException, JsonProcessingException {
   final Person person = new Person("Thomas", Byte.valueOf("23"));
   JsonSerializer serializer = new JsonSerializer();
   ObjectMapper mapper = new ObjectMapper();
   byte[] serialized = mapper.writeValueAsString(person).getBytes();
   TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
   Person desPerson = new Person();
   deserializer.deserialize(desPerson, serialized);
   assertEquals(person, desPerson);
}

那会抛出 org.apache.thrift.protocol.TProtocolException: Unrecognized type 123

,如果您以相同的方式序列化并进行序列化,则应起作用。这是一个示例:

@Test
public void testSerDe() throws TException {
   // Given
   final Person person = new Person("Thomas", Byte.valueOf("23"));
   TSerializer serializer = new TSerializer(TBinaryProtocol::new);
   // When
   byte[] serializedPerson = serializer.serialize(person);
   // Then
   TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
   GraphEvent dePerson = new Person();
   deserializer.deserialize(dePerson, serializedPerson);
   assertEquals(person, dePerson);
}

相关内容

  • 没有找到相关文章

最新更新