我正在尝试消耗一些主题。问题是,当生产者序列化数据时,他在一条消息中放置了几个对象。当我反序列化一条消息时,我只得到一个对象,而丢失了其他对象。
我的schema是这样的。
` "fields": [
{
"name": "instanceuid",
"type": "string"
},
{
"name": "s",
"type": "int"
},
{
"name": "t",
"type": "string"
}
]
`
我的反序列化类来自于这样的演示
`public T deserialize(String topic, byte[] data) {
try {
T result = null;
if(data == null) {
return null;
}
LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
ByteArrayInputStream in = new ByteArrayInputStream(data);
DatumReader<GenericRecord> userDatumReader = new SpecificDatumReader<>(targetType.newInstance().getSchema());
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
List<GenericRecord> records = new ArrayList<GenericRecord>();
while(true){
try {
GenericRecord record = userDatumReader.read(null, decoder);
records.add(record);
} catch (EOFException eof) {
break;
}
}
// result = (T) userDatumReader.read(null, decoder);
LOGGER.info("deserialized data='{}'", records);
result = (T) records;
return result;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
} finally {
}
}
}`
我的代码工作,如果消息只包含一个对象。
现在我试图得到我的反序列化的数据,但我得到错误,它是这样的
java.util.ArrayList cannot be cast to org.apache.avro.specific.SpecificRecordBase"
我的类实现了反序列化器,所以反序列化方法是T类型的。
那么我怎么能返回一个T类型与列表类型?
我没有使用confluent来注册某物的schema
在一条消息中放置多个对象
就Kafka而言,这是完全"有效"的。现在,您需要解析字节…
如果没有看到数据的实际字节数,很难回答为什么会出现错误,但这里有一些提示。
- 如果你有这个模式,你应该使用Maven插件来创建一个类,而不是使用GenericRecord。 Kafka通常与Schema Registry一起使用,比如Confluent提供的Schema Registry。Confluent提供了自己的
- 你的模式没有Avro数组类型,因此你已经知道你的解析器在试图用数组列表做一些事情时搞砸了
- 不能返回
T
。您可以返回一个具有List字段的类。你会implements Deserializer<ClassWithList>
,而不是有一个原始接口…
KafkaAvroDeserializer
类,但也有其他实现,因此您不需要编写自己的实现。更具体地说,BinaryDecoder将不能使用合流序列化的生产者数据。如前所述,您需要检查原始字节,而不能仅仅假设数据中有单个Avro记录。