我希望两个线程使用一个队列。第一个线程应每 2 秒调用一次,第二个线程应每 3 秒调用一次。两个线程应同时启动。我在访问队列的第一个元素时遇到问题。两个线程都采用索引为 0 的元素。有时它发生在队列的其他元素上,而不仅仅是第一个元素。我在控制台上有这样的输出:
- 项目 0 处理时间 1 时间:3:27:8
- 项目 0 处理者 2 时间:3:27:8
- 项目 2 处理者 1 时间:3:27:10
- 项目 3 处理者 2 时间: 3:27:11
- 项目 4 处理者 1 时间: 3:27:12
等等..
这是我使用的代码:
ConcurrentQueue<int> sharedQueue = new ConcurrentQueue<int>();
for (int i = 0; i < 10; i++)
{
sharedQueue.Enqueue(i);
}
int itemCount= 0;
Task[] tasks = new Task[2];
for (int i = 0; i < tasks.Length; i++)
{
// create the new task
tasks[i] = new Task(() =>
{
while (sharedQueue.Count > 0)
{
// define a variable for the dequeue requests
int queueElement;
// take an item from the queue
bool gotElement = sharedQueue.TryDequeue(out queueElement);
// increment the count of items processed
if (gotElement)
{
DateTime dt = DateTime.Now;
Console.WriteLine("Item " + itemCount + "processed by "
+ Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
Interlocked.Increment(ref itemCount);
if (Task.CurrentId == 1)
Thread.Sleep(2000);
else
Thread.Sleep(3000);
}
}
});
// start the new task
tasks[i].Start();
}
// wait for the tasks to complete
Task.WaitAll(tasks);
// report on the number of items processed
Console.WriteLine("Items processed: {0}", itemCount);
// wait for input before exiting
Console.WriteLine("Press enter to finish");
Console.ReadLine();
}
替换以下行:
Console.WriteLine("Item " + itemCount + "processed by " ...);
有了这一行:
Console.WriteLine("Item " + queueElement + "processed by " ...);
您看到的问题可能是由于任务几乎同时执行Console.WriteLine
,并且两者都看到了相同的itemCount
值,因为它们以尚未发生Interlocked.Increment
调用的方式交错。无论如何,打印出queueElement
可能更有意义,因为它更有意义。
请参阅Brian Gideon关于您的itemCount
问题的出色回答。
您可以考虑重写代码以使用 BlockingCollection 而不是 ConcurrentQueue<T>
。使用起来要容易得多。 BlockingCollection
是并发集合的包装器。在其默认配置中,后备存储是ConcurrentQueue
。因此,您可以获得相同的并发队列功能,但界面要好得多。
BlockingCollection<int> sharedQueue = new BlockingCollection<int>();
for (int i = 0; i < 10; i++)
{
sharedQueue.Add(i);
}
// CompleteAdding marks the queue as "complete for adding,"
// meaning that no more items will be added.
sharedQueue.CompleteAdding();
int itemCount= 0;
Task[] tasks = new Task[2];
for (int i = 0; i < tasks.Length; i++)
{
// create the new task
tasks[i] = new Task(() =>
{
foreach (var queueElement in sharedQueue.GetConsumingEnumerable())
{
DateTime dt = DateTime.Now;
Console.WriteLine("Item " + itemCount + "processed by "
+ Task.CurrentId + " Time: " + dt.Hour + ":" + dt.Minute + ":" + dt.Second);
Interlocked.Increment(ref itemCount);
if (Task.CurrentId == 1)
Thread.Sleep(2000);
else
Thread.Sleep(3000);
}
});
// start the new task
tasks[i].Start();
}
GetConsumingEnumerable 返回一个枚举器,该枚举器将从队列中获取下一项,直到队列为空。它还可以很好地处理取消,这对于ConcurrentQueue
来说有点困难。
一般来说,任何时候你想使用 ConcurrentQueue<T>
,你可能想要BlockingCollection<T>
.