我有一个消费者服务,使用EasyNetQ订阅者从RabbitMQ队列检索消息。处理每条消息需要几十秒,我需要并行运行它们,以确保我能跟上生产者的步伐。但是,每条消息都有一个属性,称之为groupingId。重要的是,具有相同groupingId的任务不能同时执行,因为这会导致资源冲突。
可能有数百个groupingId,在通常的实践中,任何时候都不会有太多具有相同Id的消息。然而,数据可能是突发的,导致一次发生数百个相同Id的集群。
我认为TPL Dataflow可能很适合,但我对它不太熟悉,也不知道如何用它实现我需要的。任何指导都将不胜感激。
创建一个分组ID字典并锁定它们。
首先,在某个地方创建字典,可能是作为成员变量。
ConcurrentDictionary<int,object> _locks = new ConcurrentDictionary<int, object>();
当您需要处理消息时,请使用此逻辑。
if (!_locks.ContainsKey(message.GroupingID))
{
_locks.TryAdd(message.GroupingID, new object());
}
lock (_locks[message.GroupingID])
{
ProcessMessage(message);
}