从Confluent Control Center JSON表示中创建测试数据



我正在尝试为Kafka Streams编写一些单元测试,并且有许多非常复杂的模式需要合并到我的测试中。

与其每次都从头开始创建对象,我更希望使用一些真实数据进行实例化,并对其进行测试。我们使用Confluent处理Avro格式的记录,并且可以从Control Center应用程序中提取模式和类似json的文本表示。JSON是有效的JSON,但它不是真正的形式,如果你只是写数据的JSON表示,所以我认为它是一些底层AVRO的文本形式的表示。

我已经使用模式来创建Java SpecificRecord类(price_assessment),并希望使用从Control Center消息复制的JSON字符串来填充该类的新实例,以提供给我的单元测试InputTopic。

目前为止我试过的代码是

var testAvroString = "{JSON copied from Control Center topic}";
Schema schema = price_assessment.getClassSchema();
DecoderFactory decoderFactory = new DecoderFactory();
Decoder decoder = null;
try {
DatumReader<price_assessment> reader = new SpecificDatumReader<price_assessment>();
decoder = decoderFactory.get().jsonDecoder(schema, testAvroString);
return reader.read(null, decoder);
} catch (Exception e)
{
return null;
}

改编自另一个使用GenericRecords的SO答案。当我尝试运行这个,虽然我得到异常Cannot invoke "org.apache.avro.Schema.equals(Object)" because "writer" is nullreader.read(...)步骤。

我不太熟悉流测试或Java,我不确定我到底做错了什么。在Java 17中编写,流版本为3.1.0,但在

版本中很灵活

我设法想出的解决方案如下,这似乎是有效的:

private static <T> T avroStringToInstance(Schema classSchema, String testAvroString) {
DecoderFactory decoderFactory = new DecoderFactory();
GenericRecord genericRecord = null;
try {
Decoder decoder = decoderFactory.jsonDecoder(classSchema, testAvroString);
DatumReader<GenericData.Record> reader =
new GenericDatumReader<>(classSchema);
genericRecord = reader.read(null, decoder);
} catch (Exception e)
{
return null;
}
var specific = (T) SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
return specific;
}

相关内容

  • 没有找到相关文章

最新更新