在并行执行中对任务进行分组,以防止同时执行具有相同groupingID的消息



我有一个消费者服务,使用EasyNetQ订阅者从RabbitMQ队列检索消息。处理每条消息需要几十秒,我需要并行运行它们,以确保我能跟上生产者的步伐。但是,每条消息都有一个属性,称之为groupingId。重要的是,具有相同groupingId的任务不能同时执行,因为这会导致资源冲突。

可能有数百个groupingId,在通常的实践中,任何时候都不会有太多具有相同Id的消息。然而,数据可能是突发的,导致一次发生数百个相同Id的集群。

我认为TPL Dataflow可能很适合,但我对它不太熟悉,也不知道如何用它实现我需要的。任何指导都将不胜感激。

创建一个分组ID字典并锁定它们。

首先,在某个地方创建字典,可能是作为成员变量。

ConcurrentDictionary<int,object> _locks = new ConcurrentDictionary<int, object>();

当您需要处理消息时,请使用此逻辑。

if (!_locks.ContainsKey(message.GroupingID))
{
_locks.TryAdd(message.GroupingID, new object());
}
lock (_locks[message.GroupingID])
{
ProcessMessage(message);
}

最新更新