C#:从 ADLS gen2 blob 下载大型 json 文件,并反序列化为对象



我使用以下代码将数据从 blob 输出到流。

private static async Task<Stream> ParallelDownloadBlobAsync(Stream outPutStream, CloudBlockBlob blob)
{
await blob.FetchAttributesAsync();
int bufferLength = 1 * 1024 * 1024;//1 MB chunk
long blobRemainingLength = blob.Properties.Length;
Queue<KeyValuePair<long, long>> queues = new Queue<KeyValuePair<long, long>>();
long offset = 0;
while (blobRemainingLength > 0)
{
long chunkLength = (long)Math.Min(bufferLength, blobRemainingLength);
queues.Enqueue(new KeyValuePair<long, long>(offset, chunkLength));
offset += chunkLength;
blobRemainingLength -= chunkLength;
}
Parallel.ForEach(queues, new ParallelOptions()
{
//Gets or sets the maximum number of concurrent tasks
MaxDegreeOfParallelism = 10
}, (queue) =>
{
using (var ms = new MemoryStream())
{
blob.DownloadRangeToStreamAsync(ms, queue.Key, queue.Value);
lock (outPutStream)
{
outPutStream.Position = queue.Key;
var bytes = ms.ToArray();
outPutStream.Write(bytes, 0, bytes.Length);
}
}
});
return outPutStream;
}

然后我使用 JsonSerializer 对数据进行去苟化,但在块未执行时

await ParallelDownloadBlobAsync(stream, cloudBlockBlob);
//resetting stream's position to 0
//stream.Position = 0;
var serializer = new JsonSerializer();
using (var sr = new StreamReader(stream))
{
using (var jsonTextReader = new JsonTextReader(sr))
{
jsonTextReader.SupportMultipleContent = true;
result = new List<T>();

while (jsonTextReader.Read())
{
result.Add(serializer.Deserialize<T>(jsonTextReader));
}
}
}

如果我使用DownloadToStreamAsync而不是并行下载(DownloadRangeToStreamAsync(,那么它有效。

我可以重现您的问题,这里的解决方案是在ParallelDownloadBlobAsync方法中,将这行代码blob.DownloadRangeToStreamAsync(ms, queue.Key, queue.Value);更改为blob.DownloadRangeToStream(ms, queue.Key, queue.Value);

不确定你和我的问题的根本原因是否相同。在我这边,根本原因是当文件很小(如 100kb(时,使用blob.DownloadRangeToStreamAsync方法时,输出流始终为 0,因此永远不会执行while condition。但是对于较大的文件,可以使用blob.DownloadRangeToStreamAsync方法。

如果无法解决您的问题,请发表评论。

最新更新