将 C Sharp 格式的 IEnumerable 序列化为<T> Avro 格式的最佳方法



我正在开发一个C Sharp sdk,它接收IEnumerable数据,然后将其发送到一个restful API。然后,restful API将把这些记录推送给Kafka。

我已经有了schemaString(this.schemaString(,下面是我对SDK序列化部分的实现:

public string ValidateAvroSchema<T>(IEnumerable<T> value) {
using(var ms = new MemoryStream()){
try{
Avro.IO.Encoder e = new BinaryEncoder(ms);
var schema = Schema.Parse(this.schemaString) as RecordSchema;
var writer = new GenericDatumWriter<GenericRecord>(schema);
foreach(T item in value) {
GenericRecord record = new GenericRecord(schema);
FieldInfo[] fieldsInfo;
Type typeParameterType = typeof(T);
var type = item.GetType();
fieldsInfo = typeParameterType.GetFields();
for (int i = 0; i < fieldsInfo.Length; i++)
{
record.Add(fieldsInfo[i].Name, GetFieldValue(item, fieldsInfo[i].Name));
}
writer.Write(record, e);
}
// I am passing this string to Restful API so the Java side can parse it
return Convert.ToBase64String(ms.ToArray());;
} catch (AvroException e) {
// handle exception
}
}
}

在API上,我做了这样的事情:

byte[] input = Base64.decodeBase64(payloadInJson.toString());
List<GenericRecord> listOfRecords = new ArrayList<>();
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
InputStream inputStream = new ByteArrayInputStream(input);
BinaryDecoder decoder = new DecoderFactory().get().binaryDecoder(inputStream, null);
while(true){
try {
GenericRecord record = reader.read(null, decoder);
listOfRecords.add(record);
} catch (EOFException eof) {
break;
}
}

它现在起作用了。谢谢你们。

只剩下一个问题了。

问题1:使用反射获取的所有属性,然后将它们添加到GenericRecord中是否合适?看起来很贵。

非常感谢。

在我看来,最方便的方法是使用:

public string ValidateAvroSchema<T>(IEnumerable<T> value) {
byte[] result = AvroConvert.SerializeHeadless(value.ToList(), this.schemaString);
return Convert.ToBase64String(result);;
}  

请记住,在这种情况下,模式是T的数组模式。要生成它,您可以使用:

AvroConvert.GenerateSchema(typeof(List<T>));

在API方面:

var deserialized = AvroConvert.DeserializeHeadless<List<T>>(result, schema);

来自https://github.com/AdrianStrugala/AvroConvert

最新更新