从 Azure blob 读取 Parquet 文件,而无需在本地下载 c# .net



我们有一个位于 Azure blob 中的镶木地板格式化文件 (500 mb(。如何直接从 blob 读取文件并保存在 c# 的内存中,例如:数据表。

我能够使用以下代码读取物理上位于文件夹中的镶木地板文件。

public void ReadParqueFile()
{
using (Stream fileStream = System.IO.File.OpenRead("D:/../userdata1.parquet"))     
{
using (var parquetReader = new ParquetReader(fileStream))
{
DataField[] dataFields = parquetReader.Schema.GetDataFields();
for (int i = 0; i < parquetReader.RowGroupCount; i++)
{
using (ParquetRowGroupReader groupReader = parquetReader.OpenRowGroupReader(i))
{
DataColumn[] columns = dataFields.Select(groupReader.ReadColumn).ToArray();
DataColumn firstColumn = columns[0];
Array data = firstColumn.Data;
//int[] ids = (int[])data;
}
}
}
}
}
}

(我能够使用源流直接从 blob 读取 csv 文件(。请建议一种直接从 blob 读取镶木地板文件的最快方法

根据我的经验,直接从 blob 读取镶木地板文件的解决方案是首先使用 sas 令牌生成 blob URL,然后使用 sas 从 url 获取HttpClient流,最后通过ParquetReader读取 http 响应流。

首先,请参阅官方文档Create a service SAS for a blob部分的以下示例代码Create a service SAS for a container or blob with .NET使用适用于 .NET Core 的 Azure Blob 存储 SDK。

private static string GetBlobSasUri(CloudBlobContainer container, string blobName, string policyName = null)
{
string sasBlobToken;
// Get a reference to a blob within the container.
// Note that the blob may not exist yet, but a SAS can still be created for it.
CloudBlockBlob blob = container.GetBlockBlobReference(blobName);
if (policyName == null)
{
// Create a new access policy and define its constraints.
// Note that the SharedAccessBlobPolicy class is used both to define the parameters of an ad hoc SAS, and
// to construct a shared access policy that is saved to the container's shared access policies.
SharedAccessBlobPolicy adHocSAS = new SharedAccessBlobPolicy()
{
// When the start time for the SAS is omitted, the start time is assumed to be the time when the storage service receives the request.
// Omitting the start time for a SAS that is effective immediately helps to avoid clock skew.
SharedAccessExpiryTime = DateTime.UtcNow.AddHours(24),
Permissions = SharedAccessBlobPermissions.Read | SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.Create
};
// Generate the shared access signature on the blob, setting the constraints directly on the signature.
sasBlobToken = blob.GetSharedAccessSignature(adHocSAS);
Console.WriteLine("SAS for blob (ad hoc): {0}", sasBlobToken);
Console.WriteLine();
}
else
{
// Generate the shared access signature on the blob. In this case, all of the constraints for the
// shared access signature are specified on the container's stored access policy.
sasBlobToken = blob.GetSharedAccessSignature(null, policyName);
Console.WriteLine("SAS for blob (stored access policy): {0}", sasBlobToken);
Console.WriteLine();
}
// Return the URI string for the container, including the SAS token.
return blob.Uri + sasBlobToken;
}

然后使用 sas 令牌从 url 获取HttpClient的 http 响应流。

var blobUrlWithSAS = GetBlobSasUri(container, blobName);
var client = new HttpClient();
var stream = await client.GetStreamAsync(blobUrlWithSAS);

最后通过ParquetReader读取,代码来自 GitHub 存储库aloneguid/parquet-dotnetReading Data

var options = new ParquetOptions { TreatByteArrayAsString = true };
var reader = new ParquetReader(stream, options);

相关内容

  • 没有找到相关文章