当某些事务的顺序很重要时,我如何对队列使用者进行多线程处理



我有一个任务队列,这些任务对一组对象进行操作(例如,假设对象是地址簿中的条目)。

一个示例任务可能是"将Joe的电话号码更新为888-555-1212"。

队列中可以同时有多个"更新乔的电话号码…"任务,但电话号码不同。在这种情况下,必须应用更新,以确保最后的状态是正确的(不,为了论证起见,不可能在任务上加上时间戳,在通讯簿条目上加时间戳,并丢弃过时的任务)。

将Jane的更新应用到Joe的更新中是安全的。

我想对队列进行多线程处理,但我需要按人同步访问。

有方便的图书馆放这种东西吗?还是我只能使用Executor并在Runnable的run()方法中对"name"进行自己的同步?

这个问题的一个简单但并不完美的解决方案是在一个数组中维护一组子队列,其数量等于您正在运行的处理线程的数量。单个主线程从单个主队列中提取项目,并将它们添加到通过对象键的hashCode(标识和关联任务的hashCode)的模索引的子队列中。

例如

int queueIndex = myEntity.getKey().hashCode() % queues.length;

只有一个线程处理该队列,并且同一实体的所有任务都将提交到该队列,因此不存在竞争条件。

这种解决方案并不完美,因为有些线程最终可能会比其他线程拥有更大的队列。实际上,这不太重要,但需要考虑。

简单解决方案的问题:

更简单的解决方案是从单个队列中提取项目,然后锁定受影响实体的不同内容,该解决方案具有竞争条件(正如Aurand所指出的)。给定:

Master Queue [ Task1(entity1), Task2(entity1), ... ]

如果task1task2都编辑同一实体entity1,并且有thread1thread2在队列上操作,则预期/期望的事件序列为:

  • 线程1接受任务1
  • 实体1上的线程1锁定
  • 线程1编辑实体1
  • 线程1解锁实体1
  • 线程2接受任务2
  • 线程2锁定实体1
  • 线程2编辑实体1
  • 线程2解锁实体1

不幸的是,即使锁是线程运行方法的第一条语句,也可能出现以下序列:

  • 线程1接受任务1
  • 线程2接受任务2
  • 线程2锁定实体1
  • 线程2编辑实体1
  • 线程2解锁实体1
  • 线程1锁定实体1
  • 线程1编辑实体1
  • 线程1解锁实体1

为了避免这种情况,每个线程在从队列中获取任务之前都必须锁定某些东西(比如队列),然后在保持父锁的同时获取实体的锁。但是,您不希望在持有该父锁并等待获取实体锁时阻塞所有内容,因此您只需要尝试获取实体锁,然后在获取失败时处理该情况(可能将其放入另一个队列)。总的来说,情况变得无关紧要。

此类冲突总是通过为每个对象分配一个版本来解决。每次更新时,版本都会递增。因此,如果一个更新出现在错误的时间,它可能会被取消或延迟。无论如何,你都应该有一种方法来决定哪一个更新是第一个,哪一个是第二个。这种方法称为乐观锁定。

一种可能的解决方案

假设某个任务由某类描述

class Task {
  Integer taskGroup;
  // other
}

其中taskGroup是一个ID,用于标识必须按到达顺序进行处理的任务(在您的示例中,每个"名称"可以定义自己的taskGroup,或者更一般地说,具有相同名称的任务属于同一taskGroup)。

mainTaskQueue表示Task对象的一个List。然后

  • 创建一个Map<Integer,List<Task>>,比如taskGroupsQueues
  • 为每个taskGroup创建一个线程,该线程按顺序在taskGroupsQueues.get(taskGroup)上运行
  • 主线程从主任务列表mainTaskQueue中删除task并将其附加到taskGroups.get(task.taskGroup)
  • 将任务从主队列移动到单个队列和从单个队列获取必须同步

换句话说:属于同一名称的任务在同一个线程上执行。

请注意,如果主线程执行任务的分配,那么他也可以执行某种负载平衡,即,如果由于顺序一致性,任务没有被强制到特定队列,则该任务应该转到shortes队列。然而,你的问题可能会变成单线程的,这是固有的——也就是说,当你只有属于同一任务组的任务时(在你的案例名称中)。

另一种可能的解决方案(未测试,只是一个建议)

正如increment1s帖子和Aulands评论中所指出的,胎面内任务组(名称)的同步存在一些问题。基本上:为时已晚,因为执行器可能已经启动了两个试图在同一名称上同步的线程。但是,您可以尝试确保执行者级别的执行顺序。例如,请参阅以下文章:Java执行程序:如何设置任务优先级?(它引用传递给执行器的PriorityBlockingQueue)。

最新更新