当达到队列容量时,ThreadPoolExecutor在创建新线程后首先执行队列中的任务,而不是执行新的传入作业



我写了一个简单的演示来模拟演示我面临的问题。

public abstract class ThreadPoolTest {
private static final int CORE_POOL_SIZE = 5;
private static final int QUEUE_CAPACITY = 15;
private static final int MAX_POOL_SIZE = 50;
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
ThreadPoolExecutor pool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 1l, TimeUnit.SECONDS, workQueue);
execueInBatch(pool, 25);
pool.shutdown();
}
@SuppressWarnings("boxing")
private static void execueInBatch(ThreadPoolExecutor pool, int num) throws InterruptedException {
for (int i = 1; i <= num; i++) {
final Integer it = Integer.valueOf(i);
try {
System.out.println("About to start " + i);
CompletableFuture.runAsync(() -> {
System.out.println("Started- " + it);
try {
Thread.sleep(5000l);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Completed- " + it);
}, pool);
} catch (Exception e) {
System.err.println(e.getMessage() + " for-" + i);
}
}
}
}

响应如下:
即将启动1
将启动2
准备启动3
已启动-1
正要启动4
已经启动-2
要启动5
开始-3
快要启动6
又要启动7
还要启动8
打算启动9
启动-4
将会启动10
想要启动11
想启动12
就要启动13
就会启动14
将要start 15
将要启动16
即将启动17
已启动-5
准备启动18
约要启动20
拟启动21
预计启动22
计划启动23
开始-21
已启动-22
开始-23
计划启动24
待启动25
已经启动-24
已经启动-25
完成-1
启动-6
完成-3
已完成-2
已完成-4
开始-8
已经开始-7
启动-9
完成-5
开始-10
完成-23
已完成-21
已经开始-11
结束-22
启动-12
刚开始-13
结束-25
完整-24
刚开始-14
刚刚开始-15
完整-6
正在启动-16
正在完成-8
刚刚完成-9
Started-17
已完成-7
已开始-18
已经开始-19
已完成-10
开始-20
完成-13
已完成-11
完成-12
已经完成-15
已经完成-14
正在完成-16
正在完成-17
在完成-18
完成-19
结束-20

我需要的是在开始任务21之前先开始任务6。有办法实现吗?

当池中有多个线程时,你无法保证哪一个线程会真正首先进入可运行的任务,除非池大小是一个-并且没有任何东西同时运行-或者如果你在每个任务中添加额外的屏障/检查,以保证并行执行中特定的顺序启动顺序。

此示例显示对每个整数使用CountDownLatch。每个任务在倒计时它自己的锁存器之前检查是否触发了上一个整数的CountDownLatch,然后继续执行该任务项。

private static void execueInBatch(ThreadPoolExecutor pool, int num) throws InterruptedException {
CountDownLatch prev = new CountDownLatch(0);
for (int i = 1; i <= num; i++) {
final Integer it = Integer.valueOf(i);
final CountDownLatch thislatch = new CountDownLatch(1);
final CountDownLatch prevlatch = prev;
try {
System.out.println("About to start " + i+" in "+Thread.currentThread());
CompletableFuture.runAsync(() -> {
try {
// This check is not needed, it just shows how often threads are running out of the add sequence:
if(!prevlatch.await(1, TimeUnit.NANOSECONDS))
System.out.println("Note: out of sequence run - " + it+ " in "+Thread.currentThread());
// Await event start of previous value of "it" to guarantee sequential start
prevlatch.await();
System.out.println("Started- " + it+ " in "+Thread.currentThread());
// Count down as this one is starting
thislatch.countDown();
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Completed- " + it+ " in "+Thread.currentThread());
}, pool);
} catch (Exception e) {
System.err.println(e.getMessage() + " for-" + i+ " in "+Thread.currentThread());
}
prev = thislatch;
}
}

这不是一个理想的解决方案,因为无论何时首先处理一个无序的值(如您所观察到的项目21(,它都会阻塞某些池线程,而那些较晚的值线程将等待其他线程上的早期操作结束,然后等待其他池线程在两者之间拾取项目。

最新更新