如何设计互斥但独立并发方法的任务队列?



我在一次求职面试中被问到一个并发问题,最终归结为以下要求。我能够简单地通过使用相互排斥来实现#2到#4,但不是#1。

使用以下方法设计任务队列:

public void registerCallback(Runnable task)

public void eventFired()

  1. 多个线程应该能够将任务放在队列中,可能同时进行。
  2. eventFired只应调用一次。
  3. 如果以前调用过eventFired,则以后对任一方法的任何调用都应引发异常。
  4. 如果在执行registerCallback时调用eventFired,请将触发事件延迟到稍后的时间。
  5. 如果在执行eventFired时调用registerCallback,则引发异常。

ReentrantReadWriteLock似乎很有希望,因为registerCallback可以获得读锁,并eventFired写锁,但这并不能解决调用registerCallback然后eventFired的争用条件。

有什么想法吗?

ReentrantReadWriteLock似乎很有

希望,因为registerCallback可以获得读锁,并eventFired写锁,但这并不能解决调用registerCallback然后eventFired的竞争条件。

注册回调,换句话说,修改数据结构,同时保持读锁定,从来都不是一个好主意。您需要另一个线程安全构造来存储回调,并且不必要地使代码复杂化。

注册回调的操作,例如将引用存储到某个集合中或实例化任何类型的节点对象,非常简单,允许使用普通的互斥锁,因为它不会长时间持有。

无论您使用synchronizedLock还是支持原子更新的状态,在任何一种情况下,竞争条件都不存在,因为不能有registerCallbackeventFired重叠。如果使用得当,所有这些方法都会为这些操作带来秩序。所以每个registerCallback要么是,在第一个eventFired之前,要么在它之后。

实现这一点可能很简单:

public class JobQueue {
private List<Runnable> callbacks = new ArrayList<>();
public synchronized void registerCallback(Runnable task) {
if(callbacks == null) {
throw new IllegalStateException("Event already fired");
}
callbacks.add(Objects.requireNonNull(task));
}
public void eventFired() {
List<Runnable> list;
synchronized(this) {
list = callbacks;
callbacks = null;
}
if(list == null) {
throw new IllegalStateException("Can only fire once");
}
for(Runnable r: list) r.run();
}
}

synchronized块中执行的代码非常短,以至于争用对于大多数实际用例都无关紧要。用Lock实现相同的内容很简单,但不会有任何优势。事实上,特定于 JVM 的优化可能会使基于synchronized的解决方案更加高效。

为了完整起见,这里有一个基于原子更新的解决方案:

public class JobQueue {
private static final Runnable DONE = () -> {};
private final AtomicReference<Runnable> pending = new AtomicReference<>();
public void registerCallback(Runnable task) {
Objects.requireNonNull(task);
for(;;) {
Runnable previous = pending.get();
if(previous == DONE) throw new IllegalStateException("Event already fired");
if(pending.compareAndSet(previous, sequence(previous, task))) return;
}
}
public void eventFired() {
Runnable previous = pending.getAndSet(DONE);
if(previous == DONE) throw new IllegalStateException("Can only fire once");
if(previous != null) previous.run();
}
static Runnable sequence(Runnable a, Runnable b) {
return a == null? b: () -> { a.run(); b.run(); };
}
}

实际上,多个registerCallback和/或eventFired调用的执行可能会重叠,但在这种情况下,只有一个可以成功执行关键的原子更新。这会在操作中引入一个顺序,最多使一个eventFired调用成功,并将所有registerCallback调用分类为之前之后

最新更新