用于不同任务类型的闩锁



我正在寻找解决以下问题的java并发解决方案。

有一些任务正在运行,还有一段代码 C。

  • C 必须等待所有任务完成。(超时(

  • 在 C 完成之前,不得开始任何任务。

我浏览了java.concurrency包,发现了一些有趣的东西,但似乎没有什么工作是完全正确的:

  • 相位器将允许单向阻塞,但不允许双向阻塞。
  • 信号量,ForkJoinTasks和其他具有计数器类型功能,但似乎没有一个能做到我想要的。

我相信我可以使用相位器和锁来构建一些东西,如下所示:

void C() {
synchronized(lock) {
phaser.awaitAdvanceInterruptibly(phase, 1, TimeUnit.SECONDS);
// Start work anyway if a task is taking too long.
doWork();
}
}
void someTask() {
synchronized(lock) {
phaser.register();
}
doTask().thenRun(
() -> phaser.arriveAndDeregister()
);
}

现在,虽然我相当确定这会起作用,但我也意识到尝试构建自己的并发解决方案是一个坏主意。有没有更好的方法

?如果没有,我将用什么来phase论点?

编辑:此问题在涉及Web客户端连接的项目中,因此任务无法预测到达。但是,这种情况可以通过更仔细的设计来避免。

这是一个专门的用例,我认为我们需要使用多个并发实用程序进行协调。下面的程序应该可以做到这一点。请随时发布任何不清楚的部分的问题 -

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class TestClass {
private volatile int numOfActiveTasks = 0;
private Semaphore cSemaphore = new Semaphore(1);
private Semaphore taskSemaphore = new Semaphore(1);
private Object tasksLock = new Object();
//Test method
public static void main(String[] args) throws IOException {
TestClass testClass = new TestClass();
//Launch some task threads
ExecutorService taskES = Executors.newFixedThreadPool(2);
IntStream.range(1, 11).forEach((i) -> taskES.submit(() -> {
try {
testClass.executeTask();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
//Launch some C threads
ExecutorService cES = Executors.newFixedThreadPool(2);
IntStream.range(1, 5).forEach((i) -> cES.submit(() -> {
try {
testClass.C();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
taskES.shutdown();
cES.shutdown();
}

void C() throws InterruptedException {
try {
cSemaphore.acquire();
//If tasks are running, wait at-least n seconds
this.taskSemaphore.tryAcquire(1, TimeUnit.SECONDS);
print("C started running");
doCsWork();
} finally {
cSemaphore.release();
print("C stopped running");
}
}
void executeTask() throws InterruptedException {
//Do not start while C is running
cSemaphore.acquire();
cSemaphore.release();
synchronized (tasksLock) {
++numOfActiveTasks;
taskSemaphore.tryAcquire();
print("A task started running. Total " + numOfActiveTasks + " tasks running");
}
doTasksWork();
synchronized (tasksLock) {
--numOfActiveTasks;
if (numOfActiveTasks == 0) {
taskSemaphore.release();
}
print("A task stopped running. Total " + numOfActiveTasks + " tasks remaining");
}
}
void doCsWork() throws InterruptedException {
Thread.sleep(1000);
}
void doTasksWork() throws InterruptedException {
Thread.sleep(2000);
}
void print(String message) {
System.out.println(message);
}
}

我在java.util.concurrent.locks中找到了这个问题的解决方案,非常适合我的用例。

StampedLock lock;
void C() {
long stamp = lock.tryWriteLock(1, TimeUnit.SECONDS);
doWork();
lock.unlockWrite(stamp);
}
void someTask() {
long stamp = lock.readLock();
doTask().thenRun(() -> lock.unlockRead(stamp));
}

StampedLock 类的关键是 readLock(( 不是独占的,而 writeLock(( 是独占的。它还支持超时,类似于常规锁定。

最新更新