使用System.Text.Json异步反序列化列表



假设我请求一个包含许多对象列表的大型json文件。我不希望它们一下子就出现在记忆中,但我宁愿一个接一个地阅读和处理它们。因此,我需要将异步System.IO.Stream流转换为IAsyncEnumerable<T>。如何使用新的System.Text.JsonAPI来执行此操作?

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
// Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
}
}
}

TL;DR这不是微不足道的


看起来有人已经发布了Utf8JsonStreamReader结构的完整代码,该结构从流中读取缓冲区并将其提供给Utf8JsonRreader,从而可以轻松地使用JsonSerializer.Deserialize<T>(ref newJsonReader, options);进行反序列化。代码也不是琐碎的。相关问题在这里,答案在这里。

但这还不够——HttpClient.GetAsync只有在收到整个响应后才会返回,本质上是缓冲内存中的所有内容。

为了避免这种情况,HttpClient.GetAsync(字符串,HttpCompletionOption)应该与HttpCompletionOption.ResponseHeadersRead一起使用。

反序列化循环也应该检查取消令牌,如果有信号,则退出或抛出。否则,循环将继续,直到整个流被接收和处理。

此代码基于相关答案的示例,使用HttpCompletionOption.ResponseHeadersRead并检查取消令牌。它可以解析包含适当项目数组的JSON字符串,例如:

[{"prop1":123},{"prop1":234}]

jsonStreamReader.Read()的第一个调用移动到数组的开头,而第二个调用移动至第一个对象的开头。当检测到数组(])的末尾时,循环本身终止。

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
//Don't cache the entire response
using var httpResponse = await httpClient.GetAsync(url,                               
HttpCompletionOption.ResponseHeadersRead,  
cancellationToken);
using var stream = await httpResponse.Content.ReadAsStreamAsync();
using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);
jsonStreamReader.Read(); // move to array start
jsonStreamReader.Read(); // move to start of the object
while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
{
//Gracefully return if cancellation is requested.
//Could be cancellationToken.ThrowIfCancellationRequested()
if(cancellationToken.IsCancellationRequested)
{
return;
}
// deserialize object
var obj = jsonStreamReader.Deserialize<T>();
yield return obj;
// JsonSerializer.Deserialize ends on last token of the object parsed,
// move to the first token of next object
jsonStreamReader.Read();
}
}

JSON片段,也就是流式JSON,也就是*

在事件流或日志记录场景中,将单个JSON对象附加到文件中是很常见的,每行一个元素,例如:

{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}

这不是一个有效的JSON文档,但单个片段是有效的。这对于大数据/高度并发的场景有几个优点。添加新事件只需要在文件中追加一行,而不需要解析和重建整个文件处理,尤其是并行处理更容易,原因有两个:

  • 只需从流中读取一行,就可以一次检索一个单独的元素
  • 输入文件可以很容易地跨行划分和分割,将每个部分提供给一个单独的工作进程,例如Hadoop集群中的工作进程或应用程序中的不同线程:计算分割点,例如将长度除以工作进程的数量,然后查找第一条换行符。把所有到这一点的东西都交给一个单独的工人

使用StreamReader

实现这一点的方法是使用TextReader,一次读取一行,并使用JsonSerializer进行解析

using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken 
while((line=await reader.ReadLineAsync()) != null)
{
var item=JsonSerializer.Deserialize<T>(line);
yield return item;
if(cancellationToken.IsCancellationRequested)
{
return;
}
}

这比反序列化正确数组的代码要简单得多。有两个问题:

  • ReadLineAsync不接受取消令牌
  • 每次迭代都会分配一个新字符串,这是我们希望通过使用System.Text.Json来避免的事情之一

尽管试图生成JsonSerializer所需的ReadOnlySpan<Byte>缓冲区,但这可能已经足够了。反序列化并非易事。

管道和序列读取器

为了避免所有位置,我们需要从流中获取ReadOnlySpan<byte>。要执行此操作,需要使用System.IO.Pipeline管道和SequenceReader结构。Steve Gordon的《SequenceReader简介》解释了如何使用此类使用分隔符从流中读取数据。

不幸的是,SequenceReader是一个ref结构,这意味着它不能用于异步或本地方法。这就是为什么Steve Gordon在他的文章中创建了一个

private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

方法读取ReadOnlySequence中的项并返回结束位置,这样PipeReader就可以从中恢复。不幸的是我们想要返回IEnumerable或IAsyncEnumerable,迭代器方法也不喜欢inout参数。

我们可以在列表或队列中收集反序列化的项,并将其作为单个结果返回,但这仍然会分配列表、缓冲区或节点,并且必须等待缓冲区中的所有项进行反序列化才能返回:

private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

我们需要类似枚举的东西,它不需要迭代器方法,可以使用async,并且不会以这种方式缓冲所有内容。

添加通道以生成IAsyncEnumerable

ChannelReader.ReadAllAsync返回一个IAsyncEnumerable。我们可以从不能作为迭代器工作的方法返回ChannelReader,并且仍然在没有缓存的情况下生成元素流。

将Steve Gordon的代码改编为使用通道,我们得到ReadItems(ChannelWriter…)和ReadLastItem方法。第一个,一次读取一个项目,直到使用ReadOnlySpan<byte> itemBytes的换行符。这可由JsonSerializer.Deserialize使用。如果ReadItems找不到分隔符,它会返回其位置,以便PipelineReader可以从流中提取下一个块。

当我们到达最后一个块并且没有其他分隔符时,ReadLastItem`读取剩余的字节并反序列化它们。

代码几乎与Steve Gordon的相同。我们不是向控制台写入,而是向ChannelWriter写入。

private const byte NL=(byte)'n';
private const int MaxStackLength = 128;
private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence, 
bool isCompleted, CancellationToken token)
{
var reader = new SequenceReader<byte>(sequence);
while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
{
if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
{
var item=JsonSerializer.Deserialize<T>(itemBytes);
writer.TryWrite(item);            
}
else if (isCompleted) // read last item which has no final delimiter
{
var item = ReadLastItem<T>(sequence.Slice(reader.Position));
writer.TryWrite(item);
reader.Advance(sequence.Length); // advance reader to the end
}
else // no more items in this sequence
{
break;
}
}
return reader.Position;
}
private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
var length = (int)sequence.Length;
if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
{
Span<byte> byteBuffer = stackalloc byte[length];
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;        
}
else // otherwise we'll rent an array to use as the buffer
{
var byteBuffer = ArrayPool<byte>.Shared.Rent(length);
try
{
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
finally
{
ArrayPool<byte>.Shared.Return(byteBuffer);
}
}    
}

DeserializeToChannel<T>方法在流的顶部创建一个管道读取器,创建一个通道并启动一个工作任务,该任务解析块并将它们推送到通道:

ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
var pipeReader = PipeReader.Create(stream);    
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
while (!token.IsCancellationRequested)
{
var result = await pipeReader.ReadAsync(token); // read from the pipe
var buffer = result.Buffer;
var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer
if (result.IsCompleted) 
break; // exit if we've read everything from the pipe
pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
}
pipeReader.Complete(); 
},token)
.ContinueWith(t=>{
pipeReader.Complete();
writer.TryComplete(t.Exception);
});
return channel.Reader;
}

ChannelReader.ReceiveAllAsync()可用于通过IAsyncEnumerable<T>:消耗所有项目

var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
//Do something with it 
}    

是的,一个真正的流式JSON(反)序列化程序在很多地方都是一个很好的性能改进。

不幸的是,System.Text.Json在我写这篇文章的时候并没有做到这一点。我不确定将来是否会这样——我希望如此!事实证明,真正的JSON流式反序列化相当具有挑战性。

也许你可以检查一下速度极快的Utf8Json是否支持它。

然而,可能会有一个针对您特定情况的自定义解决方案,因为您的需求似乎限制了难度。

这个想法是一次从数组中手动读取一个项目。我们正在利用这样一个事实,即列表中的每个项本身都是一个有效的JSON对象。

您可以手动跳过[(针对第一个项目)或,(针对下一个项目)。然后,我认为您最好使用.NET Core的Utf8JsonReader来确定当前对象的结束位置,并将扫描的字节馈送到JsonDeserializer

通过这种方式,一次只对一个对象进行轻微的缓冲。

既然我们谈论的是性能,当你在PipeReader时,你可以从它获得输入。:-)

我知道这是一篇旧帖子,但最近在.Net 6 Preview 4中发布的System.Text.Json support for IAsyncEnumerable提供了解决OP.中提到的问题的方法

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
await foreach(var item in JsonSerializer.DeserializeAsyncEnumerable<T>(stream))
{
yield return item;
}
}
}
}

这将提供按需反序列化,并且在处理大数据时非常有用。请注意,目前该功能仅限于根级别的JSON数组。

有关该功能的更多详细信息,请访问

感觉您需要实现自己的流读取器。您必须逐个读取字节,并在对象定义完成后立即停止。这确实是相当低级的。因此,您不会将整个文件加载到RAM中,而是承担您正在处理的部分。这似乎是一个答案吗?

在.NET 5(C#9)中,不需要使用ChannelReader的多个任务,而是可以使用System.IO.Pipelines扩展包和System.Text.Json.JsonSerializer,如下所示:

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
class Program
{
static readonly byte[] NewLineChars = {(byte)'r', (byte)'n'};
static readonly byte[] WhiteSpaceChars = {(byte)'r', (byte)'n', (byte)' ', (byte)'t'};
private static async Task Main()
{
JsonSerializerOptions jsonOptions = new(JsonSerializerDefaults.Web);
var json = "{"some":"thing1"}rn{"some":"thing2"}rn{"some":"thing3"}";
var contentStream = new MemoryStream(Encoding.UTF8.GetBytes(json));
var pipeReader = PipeReader.Create(contentStream);
await foreach (var foo in ReadItemsAsync<Foo>(pipeReader, jsonOptions))
{
Console.WriteLine($"foo: {foo.Some}");
}
}
static async IAsyncEnumerable<TValue> ReadItemsAsync<TValue>(PipeReader pipeReader, JsonSerializerOptions jsonOptions = null)
{
while (true)
{
var result = await pipeReader.ReadAsync();
var buffer = result.Buffer;
bool isCompleted = result.IsCompleted;
SequencePosition bufferPosition = buffer.Start;
while (true)
{
var(value, advanceSequence) = TryReadNextItem<TValue>(buffer, ref bufferPosition, isCompleted, jsonOptions);
if (value != null)
{
yield return value;
}
if (advanceSequence)
{
pipeReader.AdvanceTo(bufferPosition, buffer.End); //advance our position in the pipe
break;
}
}
if (isCompleted)
yield break;
}
}
static (TValue, bool) TryReadNextItem<TValue>(ReadOnlySequence<byte> sequence, ref SequencePosition sequencePosition, bool isCompleted, JsonSerializerOptions jsonOptions)
{
var reader = new SequenceReader<byte>(sequence.Slice(sequencePosition));
while (!reader.End) // loop until we've come to the end or read an item
{
if (reader.TryReadToAny(out ReadOnlySpan<byte> itemBytes, NewLineChars, advancePastDelimiter: true))
{
sequencePosition = reader.Position;
if (itemBytes.TrimStart(WhiteSpaceChars).IsEmpty)
{
continue;
}
return (JsonSerializer.Deserialize<TValue>(itemBytes, jsonOptions), false);
}
else if (isCompleted)
{
// read last item
var remainingReader = sequence.Slice(reader.Position);
using var memoryOwner = MemoryPool<byte>.Shared.Rent((int)reader.Remaining);
remainingReader.CopyTo(memoryOwner.Memory.Span);
reader.Advance(remainingReader.Length); // advance reader to the end
sequencePosition = reader.Position;
if (!itemBytes.TrimStart(WhiteSpaceChars).IsEmpty)
{
return (JsonSerializer.Deserialize<TValue>(memoryOwner.Memory.Span, jsonOptions), true);
}
else
{
return (default, true);
}
}
else
{
// no more items in sequence
break;
}
}
// PipeReader needs to read more
return (default, true);
}
}
public class Foo
{
public string Some
{
get;
set;
}
}

运行时间https://dotnetfiddle.net/6j3KGg

也许您可以使用Newtonsoft.Json序列化程序?https://www.newtonsoft.com/json/help/html/Performance.htm

特别参见章节:

优化内存使用

编辑

您可以尝试反序列化JsonTextReader中的值,例如

using (var textReader = new StreamReader(stream))
using (var reader = new JsonTextReader(textReader))
{
while (await reader.ReadAsync(cancellationToken))
{
yield return reader.Value;
}
}

最新更新