只有当数据较少时才异步



我正在读取azure表的数据——大约5k个表,并收集不同的指标,并将它们保存回其他一些azure表中,一切都以异步方式进行。我面临的问题是,当偶尔会出现大量数据时,应用程序就会挂起。相同的代码在数据较少的情况下运行良好。我正在做的步骤(所有这些都是异步的,使用Rx、async和wait)是

  1. 从Azure读取所有表名
  2. 读取之前度量数据的所有表(1&2并行-Task.WhenAll)
  3. 从每个表中获取数据,处理并保存(Task.WhenAll)

我想要的是,使用异步直到它不会使我的应用程序挂起。如果有比可以处理的数据更多的数据,它不应该再读取任何表的数据,而应该专注于完成可用的数据处理。

Parallel.ForEach处理好了吗?

代码:根据Stephen Cleary编辑,仍然不适用于所有表。而它适用于500张桌子,

我认为是数据量导致应用程序(控制台应用程序)停滞,而不是线程数量。(一个线程最终可能检索到数以千计的百万行,每千行都会传递给一个方法,并将其计数添加到字典中,因此在需要更多内存时可能会被垃圾收集)还是我实现Semaphoreslim的方式错了?

public async Task CalculateMetricsForAllTablesAsync()
{
var allWizardTableNamesTask = GetAllWizardTableNamesAsync();
var allTablesNamesWithLastRunTimeTask = GetAllTableNamesWithLastRunTimeAsync();
await Task.WhenAll(allWizardTableNamesTask, allTablesNamesWithLastRunTimeTask).ConfigureAwait(false);
var allWizardTableNames = allWizardTableNamesTask.Result;
var allTablesNamesWithLastRunTime = allTablesNamesWithLastRunTimeTask.Result;
var throttler = new SemaphoreSlim(10);
var concurrentTableProcessingTasks = new ConcurrentStack<Task>();
foreach (var tname in allWizardTableNames)
{
await throttler.WaitAsync();
try
{
concurrentTableProcessingTasks.Push(ProcessTableDataAsync(tname, getTableNameWithLastRunTime(tname)));
}
finally
{
throttler.Release();
}
}
await Task.WhenAll(concurrentTableProcessingTasks).ConfigureAwait(false);
}
private async Task ProcessTableDataAsync(string tableName, Tuple<string, string> matchingTable)
{
var tableDataRetrieved = new TaskCompletionSource<bool>();
var metricCountsForEachDay = new ConcurrentDictionary<string, Tuple<int, int>>();
_fromATS.GetTableDataAsync<DynamicTableEntity>(tableName, GetFilter(matchingTable))
.Subscribe(entities => ProcessWizardDataChunk(metricCountsForEachDay, entities), () => tableDataRetrieved.TrySetResult(true));
await tableDataRetrieved.Task;
await SaveMetricDataAsync(tableName, metricCountsForEachDay).ConfigureAwait(false);
}

由于async正在包装Rx,我建议在async级别进行节流。您可以通过定义SemaphoreSlim并将方法逻辑封装在WaitAsync/Release中来实现这一点。

或者,考虑TPL数据流。Dataflow具有内置的节流选项(MaxDegreeOfParallelism),还可以与async和Rx自然地进行互操作。

相关内容

  • 没有找到相关文章

最新更新