我正在开发一个C Sharp sdk,它接收IEnumerable数据,然后将其发送到一个restful API。然后,restful API将把这些记录推送给Kafka。
我已经有了schemaString(this.schemaString(,下面是我对SDK序列化部分的实现:
public string ValidateAvroSchema<T>(IEnumerable<T> value) {
using(var ms = new MemoryStream()){
try{
Avro.IO.Encoder e = new BinaryEncoder(ms);
var schema = Schema.Parse(this.schemaString) as RecordSchema;
var writer = new GenericDatumWriter<GenericRecord>(schema);
foreach(T item in value) {
GenericRecord record = new GenericRecord(schema);
FieldInfo[] fieldsInfo;
Type typeParameterType = typeof(T);
var type = item.GetType();
fieldsInfo = typeParameterType.GetFields();
for (int i = 0; i < fieldsInfo.Length; i++)
{
record.Add(fieldsInfo[i].Name, GetFieldValue(item, fieldsInfo[i].Name));
}
writer.Write(record, e);
}
// I am passing this string to Restful API so the Java side can parse it
return Convert.ToBase64String(ms.ToArray());;
} catch (AvroException e) {
// handle exception
}
}
}
在API上,我做了这样的事情:
byte[] input = Base64.decodeBase64(payloadInJson.toString());
List<GenericRecord> listOfRecords = new ArrayList<>();
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
InputStream inputStream = new ByteArrayInputStream(input);
BinaryDecoder decoder = new DecoderFactory().get().binaryDecoder(inputStream, null);
while(true){
try {
GenericRecord record = reader.read(null, decoder);
listOfRecords.add(record);
} catch (EOFException eof) {
break;
}
}
它现在起作用了。谢谢你们。
只剩下一个问题了。
问题1:使用反射获取的所有属性,然后将它们添加到GenericRecord中是否合适?看起来很贵。
非常感谢。
在我看来,最方便的方法是使用:
public string ValidateAvroSchema<T>(IEnumerable<T> value) {
byte[] result = AvroConvert.SerializeHeadless(value.ToList(), this.schemaString);
return Convert.ToBase64String(result);;
}
请记住,在这种情况下,模式是T的数组模式。要生成它,您可以使用:
AvroConvert.GenerateSchema(typeof(List<T>));
在API方面:
var deserialized = AvroConvert.DeserializeHeadless<List<T>>(result, schema);
来自https://github.com/AdrianStrugala/AvroConvert