我正在尝试编写一个程序,通过将它们放入来自不同线程的集合中并在迭代集合并释放项目的单个线程中清理它们来计划删除项目。
在这样做之前,我想知道什么会产生最佳性能,所以我尝试了ConcurrentBag
、ConcurrentStack
和ConcurrentQueue
,并测量了添加 10,000,000 个项目所需的时间。
我使用以下程序来测试这一点:
class Program
{
static List<int> list = new List<int>();
static ConcurrentBag<int> bag = new ConcurrentBag<int>();
static ConcurrentStack<int> stack = new ConcurrentStack<int>();
static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
static void Main(string[] args)
{
run(addList);
run(addBag);
run(addStack);
run(addQueue);
Console.ReadLine();
}
private static void addList(int obj) { lock (list) { list.Add(obj); } }
private static void addStack(int obj) { stack.Push(obj); }
private static void addQueue(int obj) { queue.Enqueue(obj); }
private static void addBag(int obj) { bag.Add(obj); }
private static void run(Action<int> action)
{
Stopwatch stopwatch = Stopwatch.StartNew();
Parallel.For(0, 10000000, new ParallelOptions() { MaxDegreeOfParallelism = # },
action);
stopwatch.Stop();
Console.WriteLine(action.Method.Name + " takes " + stopwatch.Elapsed);
}
}
其中 # 是使用的线程数。
但结果相当令人困惑:
有 8 个线程:
- addList 需要 00:00
- :00.8166816
- addBag 需要 00:00:01.0368712
- addStack 需要 00:00:01.0902852 addQueue 需要 00:00
- :00.6555039
使用 1 个线程:
- addList 需要 00:00
- :00.3880958
- addBag 需要 00:00:01.5850249
- addStack 需要 00:00:01.2764924 addQueue 需要 00:00
- :00.4409501
因此,无论有多少线程,似乎只锁定一个普通的旧列表比使用任何并发集合都快,除了,如果队列需要处理大量写入,则可能是队列。
编辑:在下面关于垃圾和调试构建的评论之后:是的,这会影响基准测试。调试版本的影响将是线性的,垃圾将随着内存使用率的增加而增加。
然而,多次运行相同的测试会给出大致相同的结果。
我将集合的初始化移到测试运行之前,并在运行后收集垃圾,如下所示:
list = new List<int>();
run(addList);
list = null;
GC.Collect();
将MaxDegreeOfParallelism
设置为 8 时,我得到以下结果:
- addList 需要 00:00:7959546
- addBag 需要 00:00:01.08023823
- addStack 需要 00:00:01.1354566 addQueue 需要 00:00
- :00.6597145
每次运行代码时,给出或接受 0.02 秒的偏差。
并发集合并不总是更快。 其中更多的人只在更高的争用级别看到性能增益,实际工作负载也会产生影响。查看 pfx 团队的这篇论文:)
http://blogs.msdn.com/b/pfxteam/archive/2010/04/26/9997562.aspx
不过要小心过早的优化。 把一些有效的东西放在一起,然后优化。 特别是因为实际工作量很重要。 此外,将锁作为性能瓶颈是非常好的,通常有一些 IO 或其他算法需要更长的时间:)
不要忘记,您不必将项目添加到集合中,还必须检索它们。因此,更公平的比较是基于监视器的队列
然后我在机器上得到以下结果(我将迭代次数增加了 10 倍):
- AddQueue1 需要 00:00:18.0119159
- AddQueue2 需要 00:00:13.3665991
但有趣的不仅仅是性能。看看这两种方法:检查Add/ConsumptionQueue1的正确性非常困难,而很容易看出Add/ConsumptionQueue2完全符合预期,这要归功于BlockingCollection
static Queue<int> queue1 = new Queue<int>();
static BlockingCollection<int> queue2 = new BlockingCollection<int>();
static void Main(string[] args)
{
Run(AddQueue1, ConsumeQueue1);
Run(AddQueue2, ConsumeQueue2);
Console.ReadLine();
}
private static void AddQueue1(int obj)
{
lock (queue1)
{
queue1.Enqueue(obj);
if (queue1.Count == 1)
Monitor.Pulse(queue1);
}
}
private static void ConsumeQueue1()
{
lock (queue1)
{
while (true)
{
while (queue1.Count == 0)
Monitor.Wait(queue1);
var item = queue1.Dequeue();
// do something with item
}
}
}
private static void AddQueue2(int obj)
{
queue2.TryAdd(obj);
}
private static void ConsumeQueue2()
{
foreach (var item in queue2.GetConsumingEnumerable())
{
// do something with item
}
}
private static void Run(Action<int> action, ThreadStart consumer)
{
new Thread(consumer) { IsBackground = true }.Start();
Stopwatch stopwatch = Stopwatch.StartNew();
Parallel.For(0, 100000000, new ParallelOptions() { MaxDegreeOfParallelism = 8 }, action);
stopwatch.Stop();
Console.WriteLine(action.Method.Name + " takes " + stopwatch.Elapsed);
}
看看添加和获取的性能比较。这是我使用的代码:
class Program
{
static List<int> list = new List<int>();
static ConcurrentBag<int> bag = new ConcurrentBag<int>();
static ConcurrentStack<int> stack = new ConcurrentStack<int>();
static ConcurrentQueue<int> queue = new ConcurrentQueue<int>();
static void Main(string[] args)
{
list = new List<int>();
run(addList);
run(takeList);
list = null;
GC.Collect();
bag = new ConcurrentBag<int>();
run(addBag);
run(takeBag);
bag = null;
GC.Collect();
stack = new ConcurrentStack<int>();
run(addStack);
run(takeStack);
stack = null;
GC.Collect();
queue = new ConcurrentQueue<int>();
run(addQueue);
run(takeQueue);
queue = null;
GC.Collect();
Console.ReadLine();
}
private static void takeList(int obj)
{
lock (list)
{
if (list.Count == 0)
return;
int output = list[obj];
}
}
private static void takeStack(int obj)
{
stack.TryPop(out int output);
}
private static void takeQueue(int obj)
{
queue.TryDequeue(out int output);
}
private static void takeBag(int obj)
{
bag.TryTake(out int output);
}
private static void addList(int obj) { lock (list) { list.Add(obj); } }
private static void addStack(int obj) { stack.Push(obj); }
private static void addQueue(int obj) { queue.Enqueue(obj); }
private static void addBag(int obj) { bag.Add(obj); }
private static void run(Action<int> action)
{
Stopwatch stopwatch = Stopwatch.StartNew();
Parallel.For(0, 10000000, new ParallelOptions()
{
MaxDegreeOfParallelism = 8
}, action);
stopwatch.Stop();
Console.WriteLine(action.Method.Name + " takes " + stopwatch.Elapsed);
}
}
输出为:
- addList 需要 00:00
- :00.8875893
- takeList 需要 00:00:00.7500289
- addBag 需要 00:00:01.8651759
- takeBag 需要 00:00:00.5749322
- addStack 需要 00:00:01.5565545
- takeStack 需要 00:00:00.3838718
- addQueue 需要 00:00:00.8861318
- 采取队列需要 00:00:01.0510706
是的,但重点是您需要一些并发多个线程,使用并发长时间运行它以查看平均性能,因为这没有考虑不同集合的锁定策略。