我正在使用thrift来进行序列化和避难所,以改进性能,同时从kafka流传输字节数据
当我避免时,我会继续遇到此错误:
org.apache.thrift.protocol.tprotocolexception:未识别的类型123
我的代码
public void streamMessageByte() {
final StreamsBuilder builder = new StreamsBuilder();
KStream<Integer, byte[]> stream = builder.stream(kafka_topic);
deserializer = new TDeserializer();
serializer = new TSerializer();
//Thrift class pojo object is 'deser' which matches byte array data format
stream.map((k,v){
try{
deserializer.deserialize(deser, v);
}
catch(TException e){
}
null;
});
我遇到了这个问题,当我使用不同的协议来序列化和进行序列化时。
序列化器是usind ObjectMapper
,而deSerializer则与 TBinaryProtocol
一起使用 TDeserializer
。示例:
@Test
public void testSerDe() throws TException, JsonProcessingException {
final Person person = new Person("Thomas", Byte.valueOf("23"));
JsonSerializer serializer = new JsonSerializer();
ObjectMapper mapper = new ObjectMapper();
byte[] serialized = mapper.writeValueAsString(person).getBytes();
TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
Person desPerson = new Person();
deserializer.deserialize(desPerson, serialized);
assertEquals(person, desPerson);
}
那会抛出 org.apache.thrift.protocol.TProtocolException: Unrecognized type 123
,如果您以相同的方式序列化并进行序列化,则应起作用。这是一个示例:
@Test
public void testSerDe() throws TException {
// Given
final Person person = new Person("Thomas", Byte.valueOf("23"));
TSerializer serializer = new TSerializer(TBinaryProtocol::new);
// When
byte[] serializedPerson = serializer.serialize(person);
// Then
TDeserializer deserializer = new TDeserializer(TBinaryProtocol::new);
GraphEvent dePerson = new Person();
deserializer.deserialize(dePerson, serializedPerson);
assertEquals(person, dePerson);
}