这是我第一次使用线程池,我不太明白执行器服务是如何工作的。我在图像上添加水印并将它们合并到一张空图片上。但即使我只使用一个线程,它仍然只会画一半。
这是我的WorkerThread 类:
public class WorkerThread implements Runnable {
BufferedImage source;
BufferedImage toDraw;
int x;
int y;
BufferedImage target;
ParallelWatermarkFilter pf;
public WorkerThread(BufferedImage source, BufferedImage toDraw, int x, int y, BufferedImage target){
this.source = source;
this.toDraw = toDraw;
this.x = x;
this.y = y;
this.target = target;
pf = new ParallelWatermarkFilter(source, 5);
}
@Override
public void run() {
pf.mergeImages(source, toDraw, x, y, target);
}
}
这就是我在FilterClass中使用 ExecutorService 的方式:
public BufferedImage apply(BufferedImage input) {
ExecutorService threadpool = Executors.newFixedThreadPool(numThreads);
for (int w = 0; w < imgWidth; w += watermarkWidth) {
for (int h = 0; h < imgHeight; h += watermarkHeight) {
Runnable worker = new WorkerThread(input, watermark, w, h, result);
System.out.println("WIDTH: " + w + " HEIGHT: " + h);
threadpool.execute(worker);
}
}
threadpool.shutdown();
线程不等到一个线程完成吗?
ThreadPoolExecutor
关闭和任务执行/清空工作队列/从工作队列中取出的事情是不雅的。因此,您不能依赖线程中断机制或其他机制。您得到的保证是:
启动有序关机,其中以前提交的任务是 已执行,但不接受任何新任务。调用没有 如果已关闭,则会产生附加效果。
此方法不等待以前提交的任务完成 执行。
为了更深入地了解ThreadPoolExecutor
实现,让我们看一下主要的执行方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
这里的关键部分是呼唤getTask()
。它的片段是:
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
该方法不同步,仅依赖于 CAS 提供的ctl
值提供的排序。ctl
这里是存储在AtomicInteger
中的全局池状态(用于非阻塞原子ThreadPoolExecutor
状态获取(。
因此,以下情况是可能的。
- 名为
getTask
的工作线程 - 获取池的工作线程运行状态。它仍然
RUNNING
. - 另一个线程启动了订单关闭并相应地修改
ctl
。
工作 - 线程已从工作队列中获取任务。