我有一个计算密集型方法Calculate
,可能会运行几秒钟,请求来自多个线程。
应该只执行一个Calculate
,后续请求应该排队,直到初始请求完成。如果已经有一个请求排队,那么后续请求可以被丢弃(因为排队的请求将是足够的)
似乎有很多可能的解决方案,但我只需要最简单的。
更新:这是我的初步尝试:
private int _queueStatus;
private readonly object _queueStatusSync = new Object();
public void Calculate()
{
lock(_queueStatusSync)
{
if(_queueStatus == 2) return;
_queueStatus++;
if(_queueStatus == 2) return;
}
for(;;)
{
CalculateImpl();
lock(_queueStatusSync)
if(--_queueStatus == 0) return;
}
}
private void CalculateImpl()
{
// long running process will take a few seconds...
}
IMO最简单,最干净的解决方案是使用TPL Dataflow
(一如既往)与BufferBlock
作为队列。BufferBlock
是线程安全的,支持async-await
,更重要的是,TryReceiveAll
可以一次获取所有项。它也有OutputAvailableAsync
,所以你可以异步等待项目被提交到缓冲区。当发布多个请求时,您只需选择最后一个请求,而忘记其他请求:
var buffer = new BufferBlock<Request>();
var task = Task.Run(async () =>
{
while (await buffer.OutputAvailableAsync())
{
IList<Request> requests;
buffer.TryReceiveAll(out requests);
Calculate(requests.Last());
}
});
用法:
buffer.Post(new Request());
buffer.Post(new Request());
Edit:如果您没有Calculate
方法的任何输入或输出,您可以简单地使用boolean
作为开关。如果它为真,你可以关闭它并计算,如果它在Calculate
运行时再次为真,那么再次计算:
public bool _shouldCalculate;
public void Producer()
{
_shouldCalculate = true;
}
public async Task Consumer()
{
while (true)
{
if (!_shouldCalculate)
{
await Task.Delay(1000);
}
else
{
_shouldCalculate = false;
Calculate();
}
}
}
一次只取1个值的BlockingCollection
窍门是,如果集合
我同意I3aron +1的答案
这(可能)是一个BlockingCollection解决方案
public static void BC_AddTakeCompleteAdding()
{
using (BlockingCollection<int> bc = new BlockingCollection<int>(1))
{
// Spin up a Task to populate the BlockingCollection
using (Task t1 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < 100; i++)
{
if (bc.TryAdd(i))
{
Debug.WriteLine(" add " + i.ToString());
}
else
{
Debug.WriteLine(" skip " + i.ToString());
}
Thread.Sleep(30);
}
bc.CompleteAdding();
}))
{
// Spin up a Task to consume the BlockingCollection
using (Task t2 = Task.Factory.StartNew(() =>
{
try
{
// Consume consume the BlockingCollection
while (true)
{
Debug.WriteLine("take " + bc.Take());
Thread.Sleep(100);
}
}
catch (InvalidOperationException)
{
// An InvalidOperationException means that Take() was called on a completed collection
Console.WriteLine("That's All!");
}
}))
Task.WaitAll(t1, t2);
}
}
}
这听起来像是典型的生产者-消费者关系。我建议查看BlockingCollection<T>
。它是System.Collection.Concurrent
命名空间的一部分。在此基础上,您可以实现您的队列逻辑。
你可以给BlockingCollection
提供任何内部结构来保存它的数据,比如ConcurrentBag<T>
, ConcurrentQueue<T>
等等。后者是使用的默认结构