Microsoft Avro序列化程序正在破坏提供的架构



我正试图写出Avro文件,但在序列化方面遇到了一些真正的问题。我使用的是Microsoft.Avro.Core,最近发现,当我给它一个包含type和关联logicalType的架构时,它会莫名其妙地提取内部type,并用它来替换它!这意味着我对"type": {"type": "long", "logicalType": "timestamp-micros"}DateTime声明现在是一个简单的"type": "long",接收方无法正确解释它。

如果它只是在内部这样做,以了解它使用的数据类型,那将是一回事。但这个被破坏的模式实际上是被写入输出文件的,这是完全不正确的行为。有人知道解决这个问题的方法吗?

(是的,这个库已经5年没有更新了,可能完全不受支持。但它是我能找到的唯一一个满足一个关键要求的.NET Avro序列化程序:允许我处理编译时未知的任意类型。其他一切似乎都只想使用T类型的通用序列化程序,但我的用例无法提供T。所以我不能放弃除非真的有更好的东西我可以用。但如果有的话,我愿意接受。(

经过大量的搜索和浏览文档不完善的源代码,我找到了一个解决方案。尽管Apache的官方Avro库确实需要为其所有读写器提供一个泛型类型参数,但如果将类型指定为GenericRecord,则可以将GenericRecord作为运行时定义的结构使用。(这不是一个任意的动态类型,因为分配给它的字段的值仍然必须与提供的模式匹配。(

同时,这个库的类型系统对Avro的类型集的支持比废弃的Microsoft类型系统要广泛得多。它正确地处理Avro的逻辑类型,并按照您期望的方式在它们和CLR类型之间进行转换,但有一个显著的例外:序列化DateTime将把它从系统的本地时间转换为UTC。(可能有办法解决这个问题,但我还没有找到。(

把这个留在这里,以防将来有人遇到类似的问题。

我试图重现这种行为,并编写了一个不使用泛型的小程序。

事实上,从我所看到的情况来看,DateTime子类型仍然从模式中省略,这真的很令人困惑和沮丧,因为接收者需要提前知道,可能是通过使用字段名?!,哪些长字段属于DateTime类型,哪些不是。?!默认情况下,它使用Ticks来解析DateTimes。我仔细查看了github上的Avro库,发现它使用runtimeType表示DateTime和DateTimeOffset。

它可能对你没有太大帮助,但可能对有类似问题的人有帮助。

[DataContract]
public struct TestMsg
{
[DataMember]
public int Id;
[DataMember]
public double Amount;
[DataMember]
public DateTime OrderSubmitted;
public TestMsg(int id, double amount, DateTime orderSubmitted)
{
Id = id;
Amount = amount;
OrderSubmitted = orderSubmitted;
}
}

internal class Program
{
static void Main(string[] args)
{
string line = Environment.NewLine;
string fileName = "Messages.avro";
string filePath = null;
if (Environment.NewLine.Contains("r"))
{
filePath = new DirectoryInfo(".") + @"" + fileName;
}
else
{
filePath = new DirectoryInfo(".") + @"/" + fileName;
}
ArrayList al = new ArrayList
{
new TestMsg(1, 189.12, DateTime.Now),
new TestMsg(2, 345.94, new DateTime(2000, 1, 10, 15, 20, 23, 103))
};
var schema = @"
{
""type"" : ""record"",
""name"" : ""TestAvro.TestMsg"",
""fields"" : [
{
""name"" : ""Id"",
""type"": ""int""
},
{
""name"" : ""Amount"",
""type"":  ""double""
},
{
""name"" : ""OrderSubmitted"",
""type"": ""long"",
""runtimeType"": ""DateTime""
}
]
}";

using (var dataStream = new FileStream(filePath, FileMode.Create))
{
var serializer = AvroSerializer.CreateGeneric(schema);
using (var avroWriter = AvroContainer.CreateGenericWriter(schema, dataStream, Codec.Null))
{

using (var seqWriter = new SequentialWriter<object>(avroWriter, al.Count))
{
var avroRecords = new List<AvroRecord>();

foreach (var item in al)
{
dynamic record = new AvroRecord(serializer.WriterSchema);
record.Id = ((TestMsg)item).Id;
record.Amount = ((TestMsg)item).Amount;
record.OrderSubmitted = ((TestMsg)item).OrderSubmitted.Ticks;

seqWriter.Write(record);
}
}
}
dataStream.Dispose();
}
Console.WriteLine("Now reading file.");
using (var dataStream = new FileStream(filePath, FileMode.Open))
{
using (var avroReader = AvroContainer.CreateGenericReader(schema, dataStream, true, new CodecFactory()))
{
using (var seqReader = new SequentialReader<object>(avroReader))
{
foreach (var dynamicItem in seqReader.Objects)
{
dynamic obj = (dynamic)dynamicItem;

Console.WriteLine($" {obj.Id} - {obj.Amount} - {new DateTime(obj.OrderSubmitted).ToString()}");
}
}
}
dataStream.Dispose();
}
Console.ReadLine();
}
}

最新更新