如何避免在并发数据处理期间耗尽 RAM



我在数据并发处理方面遇到问题。我的电脑内存很快用完了。关于如何修复我的并发实现的任何建议?

通用类:

public class CalculationResult
{
public int Count { get; set; }
public decimal[] RunningTotals { get; set; }
public CalculationResult(decimal[] profits)
{
this.Count = 1;
this.RunningTotals = new decimal[12];
profits.CopyTo(this.RunningTotals, 0);
}
public void Update(decimal[] newData)
{
this.Count++;
// summ arrays
for (int i = 0; i < 12; i++)
this.RunningTotals[i] = this.RunningTotals[i] + newData[i];
}
public void Update(CalculationResult otherResult)
{
this.Count += otherResult.Count;
// summ arrays
for (int i = 0; i < 12; i++)
this.RunningTotals[i] = this.RunningTotals[i] + otherResult.RunningTotals[i];
}
}

代码的单核实现如下:

Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
foreach (var i in itterations)
{
// do the processing
// ..
string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing
if (combinations.ContainsKey(combination))
combinations[combination].Update(newData);
else
combinations.Add(combination, new CalculationResult(newData));
}

多核实现:

ConcurrentBag<Dictionary<string, CalculationResult>> results = new ConcurrentBag<Dictionary<string, CalculationResult>>();
Parallel.ForEach(itterations, (i, state) => 
{
Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
// do the processing
// ..
// add combination to combinations -> same logic as in single core implementation
results.Add(combinations);
});
Dictionary<string, CalculationResult> combinationsReal = new Dictionary<string, CalculationResult>();
foreach (var item in results)
{
foreach (var pair in item)
{
if (combinationsReal.ContainsKey(pair.Key))
combinationsReal[pair.Key].Update(pair.Value);
else
combinationsReal.Add(pair.Key, pair.Value);
}
}

我遇到的问题是,几乎每个combinations字典最终都会包含930k条记录,平均消耗400 [MB]RAM内存。

现在,在单核实现中只有一个这样的字典。所有检查都针对一个字典执行。但这是一种缓慢的方法,我想使用多核优化。

在多核实现中,创建了一个ConcurrentBag实例,用于保存所有combinations字典。多线程作业完成后,所有字典将聚合为一个。此方法适用于少量并发迭代。例如,对于 4 次迭代,我的RAM使用率为~ 1.5 [GB]。当我设置完整的并行迭代量(即 200)时,问题出现了!再多的PCRAM也不足以容纳所有词典,每本词典有数百万条记录!

我一直在考虑使用ConcurrentDictioanary,直到我发现"TryAdd"方法不能保证我的情况下添加数据的完整性,因为我还需要对正在运行的总计运行更新。

唯一真正的多线程选项是,而不是将所有combinations添加到字典中 - 是将它们保存到某个数据库中。然后,数据聚合将是带有group by子句的 1 个 SQLselect语句的问题......但我不喜欢为此创建一个临时表并运行数据库实例的想法。

是否有关于如何并发处理数据而不耗尽 RAM 的方法?

编辑: 也许真正的问题应该是 - 使用ConcurrentDictionary时如何使RunningTotals更新线程安全?我刚刚遇到了这个线程ConcurrentDictionary也有类似的问题,但我的情况似乎更复杂,因为我有一个需要更新的数组。我还在调查这件事。

编辑2:这是一个带有ConcurrentDictionary的工作解决方案。我需要做的就是为字典键添加一个锁。

ConcurrentDictionary<string, CalculationResult> combinations = new ConcurrentDictionary<string, CalculationResult>();
Parallel.ForEach(itterations, (i, state) => 
{
// do the processing
// ..
string combination = "1,2,3,4,42345,52,523"; // this is determined during the processing
if (combinations.ContainsKey(combination)) {
lock(combinations[combination])
combinations[combination].Update(newData);
}
else
combinations.TryAdd(combination, new CalculationResult(newData));
});

单线程代码执行时间为1m 48s,而此解决方案执行时间为 4 次迭代1m 7s(性能提高 37%)。我仍然想知道SQL方法是否会更快,有数百万条记录?我可能会在明天进行测试并更新。

编辑 3:对于那些想知道值的ConcurrentDictionary更新有什么问题的人 - 在有锁和没有锁的情况下运行此代码。

public class Result
{
public int Count { get; set; }
}
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Start");
List<int> keys = new List<int>();
for (int i = 0; i < 100; i++)
keys.Add(i);
ConcurrentDictionary<int, Result> dict = new ConcurrentDictionary<int, Result>();
Parallel.For(0, 8, i =>
{
foreach(var key in keys)
{
if (dict.ContainsKey(key))
{
//lock (dict[key]) // uncomment this
dict[key].Count++;
}
else
dict.TryAdd(key, new Result());
}
});
// any output here is incorrect behavior. best result = no lines
foreach (var item in dict)
if (item.Value.Count != 7) { Console.WriteLine($"{item.Key}; {item.Value.Count}"); }
Console.WriteLine($"Finish");
Console.ReadKey();
}
}

编辑4:经过反复试验,我无法优化SQL方法。事实证明,这是最糟糕的主意:)我使用了一个SQL Lite数据库。内存中和文件中。使用事务和可重用的 SQL 命令参数。由于需要插入大量记录 - 性能不足。数据聚合是最简单的部分,但仅插入 4 百万行就需要花费大量时间,我什至无法想象如何有效地处理 2.4 亿条数据。到目前为止(也很奇怪),ConcurrentBag方法似乎是我电脑上最快的。其次是ConcurrentDictionary方法。 不过,ConcurrentBag内存有点重。感谢@Alisson的工作 - 现在可以将其用于更大的迭代集!

因此,您只需要确保不超过 4 次并发迭代,这是计算机资源的限制,并且仅使用此计算机,就没有魔法。

我创建了一个类来控制并发执行及其将执行的并发任务数。

该类将保存以下属性:

public class ConcurrentCalculationProcessor
{
private const int MAX_CONCURRENT_TASKS = 4;
private readonly IEnumerable<int> _codes;
private readonly List<Task<Dictionary<string, CalculationResult>>> _tasks;
private readonly Dictionary<string, CalculationResult> _combinationsReal;
public ConcurrentCalculationProcessor(IEnumerable<int> codes)
{
this._codes = codes;
this._tasks = new List<Task<Dictionary<string, CalculationResult>>>();
this._combinationsReal = new Dictionary<string, CalculationResult>();
}
}

我使并发任务的数量成为const,但它可以是构造函数中的一个参数

我创建了一个处理处理的方法。出于测试目的,我模拟了 900k iten 的循环,将它们添加到字典中,最后返回它们:

private async Task<Dictionary<string, CalculationResult>> ProcessCombinations()
{
Dictionary<string, CalculationResult> combinations = new Dictionary<string, CalculationResult>();
// do the processing
// here we should do something that worth using concurrency
// like querying databases, consuming APIs/WebServices, and other I/O stuff
for (int i = 0; i < 950000; i++)
combinations[i.ToString()] = new CalculationResult(new decimal[] { 1, 10, 15 });
return await Task.FromResult(combinations);
}

main 方法将并行启动任务,将它们添加到任务列表中,因此我们最近可以跟踪它们。

每次列表达到最大并发任务数时,我们都会await一个名为ProcessRealCombinations.

public async Task<Dictionary<string, CalculationResult>> Execute()
{
ConcurrentBag<Dictionary<string, CalculationResult>> results = new ConcurrentBag<Dictionary<string, CalculationResult>>();
for (int i = 0; i < this._codes.Count(); i++)
{
// start the task imediately
var task = ProcessCombinations();
this._tasks.Add(task);
if (this._tasks.Count() >= MAX_CONCURRENT_TASKS)
{
// if we have more than MAX_CONCURRENT_TASKS in progress, we start processing some of them
// this will await any of the current tasks to complete, them process it (and any other task which may have been completed as well)...
await ProcessCompletedTasks().ConfigureAwait(false);
}
}
// keep processing until all the pending tasks have been completed...it should be no more than MAX_CONCURRENT_TASKS
while(this._tasks.Any())
await ProcessCompletedTasks().ConfigureAwait(false);
return this._combinationsReal;
}

下一个方法ProcessCompletedTasks将等待至少一个现有任务完成。之后,它将从列表中获取所有已完成的任务(已完成的任务和可能一起完成的任何其他任务),并获得它们的结果(组合)。

对于每个processedCombinations,它将与this._combinationsReal合并(使用您在问题中提供的相同逻辑)。

private async Task ProcessCompletedTasks()
{
await Task.WhenAny(this._tasks).ConfigureAwait(false);
var completedTasks = this._tasks.Where(t => t.IsCompleted).ToArray();
// completedTasks will have at least one task, but it may have more ;)
foreach (var completedTask in completedTasks)
{
var processedCombinations = await completedTask.ConfigureAwait(false);
foreach (var pair in processedCombinations)
{
if (this._combinationsReal.ContainsKey(pair.Key))
this._combinationsReal[pair.Key].Update(pair.Value);
else
this._combinationsReal.Add(pair.Key, pair.Value);
}
this._tasks.Remove(completedTask);
}
}

对于合并在_combinationsReal中的每个processedCombinations,它将从列表中删除其各自的任务,然后继续(再次开始添加更多任务)。这将发生,直到我们为所有迭代创建了所有任务。

最后,我们继续处理它,直到列表中没有更多任务。


如果您监控 RAM 消耗,您会注意到它会增加到大约 1.5 GB(当我们同时处理 4 个任务时),然后减少到大约 0.8 GB(当我们从列表中删除任务时)。至少这是我电脑中发生的事情。

这是一个小提琴,但是我不得不将 iten 的数量从 900k 减少到 100,因为小提琴限制了内存的使用以避免滥用。

我希望这以某种方式对您有所帮助。


关于所有这些东西,需要注意的一件事是,如果您的ProcessCombinations(处理这 900k 个项目时并发执行的方法)调用外部资源,例如从 HD 读取文件、在数据库中执行查询、调用 API/WebService 方法,您将受益于使用并发任务。我猜代码可能正在从外部资源读取 900k 个项目,那么这将减少处理它所需的时间。

如果这些项目之前已加载并且ProcessCombinations只是读取内存中已有的数据,那么并发性根本无济于事(实际上我相信这会使您的代码运行速度变慢)。如果是这种情况,那么我们在错误的地方应用并发。

当所述调用将访问外部资源(获取或存储数据)时,并行使用async调用可能会提供更多帮助,并且根据外部资源可以支持的并发调用数,它可能仍然不会产生这样的差异。

最新更新