在使用者中反序列化加密的kafka消息



我有一个kafka主题,它包含没有avro模式的消息。我们最近想用avro模式向这个主题推送消息。

现在,该主题具有带/不带模式的两条消息。

我有一个消费者从这个话题中消费。

->如果我在使用者配置中将value.deserializer设置为"KafkaAvroDeserializer.class",我看不到任何消息被消费。

->如果我在使用者配置中将value.deserializer设置为"StringDeserializer.class",我就可以使用这些消息,但是,具有avro模式的消息现在看起来是加密的。

例如:ConsumerRecord(topic = sample-events, partition = 2, offset = 1089595, CreateTime = 1544116093932, checksum = 2421249481, serialized key size = -1, serialized value size = 159, key = null, value = ���test_impressLbhpb_extranet_opportunity_cleaning_fecron�����YH00756f54-ba55-11e7-8df0-fdb86cefa6ed$abcde)

我已经为avro模式生成了java类,我想将来自消费者的带有/不带有模式的消息转换为这个生成的avro java类。我能够使用objectMapper将没有模式的消息映射到avro-java类。

但对于来自消费者的带有avro模式的消息,如示例中所述,这些消息看起来像是加密的,我正在尝试以下代码片段:

SpecificDatumReader<SampleEvents> reader = new SpecificDatumReader<SampleEvents>(SampleEvents.getClassSchema());
Decoder decoder = DecoderFactory.get().binaryDecoder(new ByteArrayInputStream(record), null);
SampleEvents event = reader.read(null, decoder);

但这行不通。我得到"错误:

java.lang.ArrayIndexOutOfBoundsException:-1".

如何取消对此消息的序列化?

如果我在consumer配置中将value.deserializer设置为"KafkaAvroDeserializer.class",我看不到任何消息被消费。

嗯,你至少应该得到一个HTTP或反序列化程序错误。。。


首先,您应该使用BytesDeserializer或其变体

然后,您需要熟悉ByteBuffer的方法,并将byte[]转化为一个。。。。

如果您有模式注册表编码的Avro消息,那么这些消息具有定义良好的有线格式

因此,您可以有如下内容,但最终,它需要对主题中可能存在的数据进行一些推断。

// consumerConfig.put("value.deserializer", ByteBufferDeserializer.class)
ByteBuffer buf = record.value();
Deserializer d;
if (buf == null) {
System.err.println("Tombstoned record");
} else if (buf.get() == 0x0) { // Check for Avro
int schemaId = buf.getInt();  // If you wanted it
d = new KafkaAvroDeserializer();        
Map<String, String> config = new HashMap<>();
config.put("schema.registry.url", "http://..."); // address to registry
boolean isKey = false;
d.configure(config, isKey);
AvroValue v = d.deserialize(value);
// TODO: Handle record
} else {
try {
d = new StringDeserializer();
String s = d.deserialize(value);
// TODO: Handle record
} catch (Exception e) {
e.printStackTrace();
}
}

要点:不要在主题中生成Avro和非Avro数据类型。否则,您只需要以字节形式消费,并自己处理自定义逻辑。

最新更新