如何计算批量导入CosmosDB任务所需的吞吐量



更新

我认为我的问题可能是客户的问题。通过关闭visual studio 2019并将其重新打开并运行,我可以完全加载一个文件(全部200万条记录)。但是,如果我再次尝试运行,它将失败,并出现如下所述的问题。

Azure DocumentDB偶尔抛出SocketException/GoneException

原始问题

我正在尝试在cosmos DB中大容量加载一些数据文件。我使用的是我在这里找到的一个例子:

https://github.com/Azure/azure-cosmosdb-bulkexecutor-dotnet-getting-started/tree/master/BulkImportSample

在导入示例中,他们使用循环制作假文档。在我的情况下,我正在从一个目录中读取CSV文件。每个CSV文件行将转换为一个CosmosDB文档。

默认吞吐量为400,但我将其增加到了10000。导入后我不需要它那么高,但我可以将它设置为批量导入任务所需的任何值。尽管如此,我还是遇到了某种吞吐量或节流问题,我对如何计算执行此数据导入所需的吞吐量感到困惑。这些CSV文件中的每一个都包含大约200万行,但每一行上只有10个标量值

它开始工作了。我从BulkExecutorTrace得到了一些输出,比如"BulkExecutitorTrace信息:0:分区索引0:2452|在1秒内以19342.86 RU/s的速度对2452个文档进行操作,有4个任务。面临0个节流">

但在19行这样的输出之后,我得到:

DocDBTrace警告:0:引发异常:mscorlib.dll中的"System.Threading.Tasks.TaskCanceledException"BulkExecutorTrace信息:0:RNTBD调用在通道192.168.11.105:19676->40.78.226.8:14319上超时。错误:ReceiveTimeout分区索引0:22068|在1秒内以9652.88 RU/s对1226个文档进行操作,共有20个任务。面对0个节流阀引发异常:Microsoft.Azure.Documents.Client.dll中的"Microsoft.Azure.Documents.TransportException"引发异常:mscorlib.dll中的"Microsoft.Azure.Documents.TransportException"引发异常:mscorlib.dll中的"Microsoft.Azure.Documents.TransportException"DocDBTrace信息:0:RequestAsync失败:RID:dbs/Diseases/colls/Diseases/sprocs/__.sys.commonBulkInsert,资源类型:StoredProcedure,操作:rntbd://cdb-ms-prod-eastus1-fd10.documents.azure.com:14319/apps/00e9d5e0-018e-43a2-b5a4-f41c78498cdb/services/61a05d2a-fb3-455f-864e-c9e10e85684c/partitions/92daa841-dcc5-40f0-9c21-2956cda4d2ac/replicas/132323601201339145p/,异常:Microsoft.Azure.Documents.TransportException:发生客户端传输错误:请求在等待服务器响应时超时。(时间:2020-04-26T18:18:32.9586759Z,活动ID:80910ba7-8b36-40fa-a3bf-3eac239b00e2,错误代码:ReceiveTimeout[0x0010],基本错误:HRESULT 0x80131500,URI:rntbd://cdb-ms-prod-eastus1-fd10.documents.azure.com:14319/apps/00e9d5e0-018e-43a2-b5a4-f41c78498cdb/services/61a05d2a-fb30-455f-864e-9e10e85684c/partitions/92daa841-dcc5-40f0-9c21-2956cda4d2ac/replicas/132323601201339145p/,连接:192.168.11.105:19676->40.78.226.8:14319,发送的有效负载:True,CPU历史记录:(2020-04-26T18:12.7679827Z 80.069),(2020-04-16T18:22.76676338Z 28.038),(202 0-04-26T18:18:22.7672671Z 0.000),(2020-04-26T18:18:22.7672671Z 0.000),(2020-04-16T18:132.7701961Z 20.629),CPU计数:8)位于Microsoft.Azure.Documents.Rntbd.Channel.d_13.MoveNext()---从引发异常的前一位置开始的堆栈结尾跟踪---在System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(任务任务)位于System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(任务任务)位于Microsoft.Azure.Documents.Rntbd.LoadBalancingPartition.d_9.MoveNext()

private async Task RunBulkImportAsync()
{
DocumentCollection dataCollection = null;
try
{
dataCollection = GetCollectionIfExists(client, DatabaseName, CollectionName);
if (dataCollection == null)
{
throw new Exception("The data collection does not exist");
}
}
catch (Exception de)
{
Trace.TraceError("Unable to initialize, exception message: {0}", de.Message);
throw;
}
string partitionKeyProperty = dataCollection.PartitionKey.Paths[0].Replace("/", "");
// Set retry options high for initialization (default values).
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 30;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 9;
IBulkExecutor bulkExecutor = new BulkExecutor(client, dataCollection);
await bulkExecutor.InitializeAsync();
// Set retries to 0 to pass control to bulk executor.
client.ConnectionPolicy.RetryOptions.MaxRetryWaitTimeInSeconds = 0;
client.ConnectionPolicy.RetryOptions.MaxRetryAttemptsOnThrottledRequests = 0;
BulkImportResponse bulkImportResponse = null;
long totalNumberOfDocumentsInserted = 0;
double totalRequestUnitsConsumed = 0;
double totalTimeTakenSec = 0;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
foreach (string d in Directory.GetDirectories(RootPath).Take(1))
{
foreach (string f in Directory.GetFiles(d).Take(1))
{
Trace.WriteLine("Processing file, " + f);
var lines = File.ReadAllLines(f);
Trace.WriteLine("File has " + lines.Count() + "lines");
List<RowToImport> dataToImport = lines
.Skip(1)
.Select(v => RowToImport.FromCsv(v))
.ToList();
List<string> documentsToImportInBatch = dataToImport.Select(dti => GenerateJsonDocument(Guid.NewGuid().ToString(), dti.Disease, dti.Year, dti.Age, dti.Country, dti.CountryName, dti.CohortSize, dti.DeathsCongenital)).ToList();
// Invoke bulk import API.
var tasks = new List<Task>();
tasks.Add(Task.Run(async () =>
{
Trace.TraceInformation(String.Format("Executing bulk import for batch {0}", f));
do
{
try
{
bulkImportResponse = await bulkExecutor.BulkImportAsync(
documents: documentsToImportInBatch,
enableUpsert: true,
disableAutomaticIdGeneration: true,
maxConcurrencyPerPartitionKeyRange: 100,
maxInMemorySortingBatchSize: null,
cancellationToken: token);
}
catch (DocumentClientException de)
{
Trace.TraceError("Document client exception: {0}", de);
//Console.WriteLine("Document client exception: {0} {1}", f, de);
break;
}
catch (Exception e)
{
Trace.TraceError("Exception: {0}", e);
//Console.WriteLine("Exception: {0} {1}", f, e);
break;
}
} while (bulkImportResponse.NumberOfDocumentsImported < documentsToImportInBatch.Count) ;

Trace.WriteLine(String.Format("nSummary for batch {0}:", f));
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
bulkImportResponse.NumberOfDocumentsImported,
Math.Round(bulkImportResponse.NumberOfDocumentsImported / bulkImportResponse.TotalTimeTaken.TotalSeconds),
Math.Round(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.TotalTimeTaken.TotalSeconds),
bulkImportResponse.TotalTimeTaken.TotalSeconds));
Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
(bulkImportResponse.TotalRequestUnitsConsumed / bulkImportResponse.NumberOfDocumentsImported)));
Trace.WriteLine("---------------------------------------------------------------------n ");
totalNumberOfDocumentsInserted += bulkImportResponse.NumberOfDocumentsImported;
totalRequestUnitsConsumed += bulkImportResponse.TotalRequestUnitsConsumed;
totalTimeTakenSec += bulkImportResponse.TotalTimeTaken.TotalSeconds;
},
token));
await Task.WhenAll(tasks);
}
Trace.WriteLine("Overall summary:");
Trace.WriteLine("--------------------------------------------------------------------- ");
Trace.WriteLine(String.Format("Inserted {0} docs @ {1} writes/s, {2} RU/s in {3} sec",
totalNumberOfDocumentsInserted,
Math.Round(totalNumberOfDocumentsInserted / totalTimeTakenSec),
Math.Round(totalRequestUnitsConsumed / totalTimeTakenSec),
totalTimeTakenSec));
Trace.WriteLine(String.Format("Average RU consumption per document: {0}",
(totalRequestUnitsConsumed / totalNumberOfDocumentsInserted)));
Trace.WriteLine("--------------------------------------------------------------------- ");

Trace.WriteLine("nPress any key to exit.");
Console.ReadKey();
}
}

在目录和文件循环中,我现在使用Take(1)只是为了让一个文件工作。但是,实际上有4个目录,每个目录中几乎有100个文件。

你能给我什么建议吗?我需要如何节流才能导入所有这些数据?

该异常与吞吐量无关。该异常指向超时/连接问题,该问题在SDK故障排除页面中引用

在您的例外中,我们可以看到有一个CPU峰值:

CPU history: 
(2020-04-26T18:18:12.7679827Z 80.069), 
(2020-04-26T18:18:22.7667638Z 28.038), 
(2020-04-26T18:18:22.7672671Z 100.000), 
(2020-04-26T18:18:22.7672671Z 0.000), 
(2020-04-26T18:18:22.7672671Z 0.000), 
(2020-04-26T18:18:32.7701961Z 20.629)

这可能会导致连接问题。如果您在本地开发机器上运行此程序,请查看其他哪些进程可能正在消耗CPU。如果这是在VM中运行的,它可能需要更大的CPU池。

此外,根据您的代码,您正在从并发操作中使用批量执行器(您正在并行创建多个任务)。

批量执行器性能提示表明不应该这样做:

由于单个批量操作API执行会消耗客户端计算机的大量CPU和网络IO(这是通过在内部生成多个任务来实现的)。避免在执行批量操作API调用的应用程序进程中生成多个并发任务。如果在单个虚拟机上运行的单个批量操作API调用无法消耗整个容器的吞吐量(如果容器的吞吐量>100万RU/s),则最好创建单独的虚拟机来同时执行批量操作API调用。

最新更新