从多个生产者密钥公平排队



我有这样一个场景,我从数千个源接收事件。每个源都在发送有关其当前状态的信息。虽然我确实希望处理所有事件,但更重要的是首先处理每个源的最新事件,以便当前视图是最新的。因此,我正在考虑使用ConcurrentHashMap,每个源的标识符作为键,并使用LIFO队列(堆栈)作为值。然后,我将遍历Map的键,并从每个源的堆栈中取出一个项。

我担心的是,当我遍历键并从每个键的队列中取出项时,生产者可能会在队列中发布新事件,从而潜在地产生并发问题。制作人也可以向地图中添加新的键,并且通过MapentrySet进行迭代似乎是弱一致的。这不是一个大问题,因为新项将在随后的迭代中处理。理想情况下,我也可以在entrySet的流上使用一些并行处理来加速这个过程。

我想知道是否有一个更干净的方法。在现实中,我可以使用LIFO BlockingDequeue并首先处理最新的事件,但这种方法的问题是,存在一个源可能发送比其他源更多的事件的风险,因此可能比其他源处理更多的事件。

是否有其他的数据结构,我可以看看,提供这种行为?从本质上讲,我正在寻找一种方法来优先处理来自每个来源的事件,同时给每个来源一个公平的机会由消费者处理。

我建议您构建自己的结构来管理它,因为它特别为您的用例增加了灵活性(和速度)。

我会用一个循环队列来存储每个后进先出队列(堆栈)。在循环队列中,您将元素添加到尾部,并从头部读取(但不删除)元素。一旦头=尾,你就重新开始。

您可以使用一个简单的数组构建自己的队列。管理诸如向数组中添加更多队列以及在需要时扩展队列等操作的同步并不太难。我相信在数组中添加队列并不是你经常做的事情。

这很容易管理,你可以扩展你的循环队列来计算条目被访问的频率,并限制对其条目的访问频率(通过添加/删除消费者线程,或者甚至让它们在从条目管理的堆栈中消费之前等待一点)。

当使用多个线程从循环队列中读取元素时,你甚至可以避免线程锁定,通过让它们在从堆栈中消费之前调用"寄存器"操作:每个线程都有自己的ID,当它"注册"时,ID被存储在给定的队列条目中。在注册之前和从堆栈弹出之前,线程执行"读取注册ID"操作,返回的ID必须与自己的ID匹配。这意味着只有"拥有"给定队列条目的线程才能从该堆栈弹出。如果注册/确认注册过程失败,则意味着另一个线程正在使用该条目,因此当前线程将继续使用下一个可用的条目。

我以前就用过这种策略,而且效果很好。

您是否考虑过后进先出队列?每个源都添加到它的后进先出队列中,对于处理,您从FIFO队列中取出第一个后进先出队列,处理一个事件,然后将其放回FIFO队列中。这样,你也应该没有问题的新来源,因为他们的后进先出队列将简单地添加到先进先出队列。

为了将事件添加到正确的后进先出队列中,您可以维护一个额外的HashMap,该HashMap知道每个源的队列,如果一个新的源出现在Map中,您知道您必须将其后进先出队列添加到FIFO队列中。

相关内容

  • 没有找到相关文章