我使用BufferBlock
实现了一个生产者-消费者。代码运行良好。
static async Task Produce(ITargetBlock<int> queue)
{
try
{
// Post messages to the block asynchronously.
for (int i = 0; i < 100; i++)
{
Console.WriteLine("Sending: {0}", i);
await queue.SendAsync(i);
}
}
finally
{
queue.Complete();
}
}
static async Task Consume(ISourceBlock<int> queue)
{
// Read messages from the block asynchronously.
while (await queue.OutputAvailableAsync())
{
int value = await queue.ReceiveAsync();
Console.WriteLine("Receiving: {0}", value);
}
}
static void Main(string[] args)
{
// Create a BufferBlock<int> object.
var queue = new BufferBlock<int>();
try
{
var produce = Produce(queue);
var consume = Consume(queue);
Task.WaitAll(produce, consume, queue.Completion);
}
catch (Exception exception)
{
Console.WriteLine("An exception was thrown: {0}", exception.Message);
Console.WriteLine("Terminating...");
}
}
现在我有一个节流问题,那就是我希望消费者的最大并发数是4。我想使用SemaphoreSlim
机器人不知道如何应用它。
注意:这是一个并发调度程序问题,而不是并行性问题。
如果您只想一次消耗一定的量,您只需多次调用TryRecieve
,直到它变空或达到该量。这里有一个处理这个问题的扩展方法:
public static bool TryReceive<T>(this BufferBlock<T> bufferBlock, int count, out IList<T> items)
{
items = new List<T>();
for (var i = 0; i < count; i++)
{
T item;
if (bufferBlock.TryReceive(out item))
{
items.Add(item);
}
else
{
break;
}
}
return items.Any();
}
因此,消费者变成:
static async Task Consume(BufferBlock<int> queue)
{
// Read messages from the block asynchronously.
while (await queue.OutputAvailableAsync())
{
IList<int> values;
queue.TryReceive(4, out values);
Console.WriteLine("Receiving: {0}", string.Join(", ", values));
}
}