同步线程池执行程序服务



这是我第一次使用线程池,我不太明白执行器服务是如何工作的。我在图像上添加水印并将它们合并到一张空图片上。但即使我只使用一个线程,它仍然只会画一半。

这是我的WorkerThread 类

public class WorkerThread implements Runnable {
BufferedImage source;
BufferedImage toDraw;
int x;
int y;
BufferedImage target;
ParallelWatermarkFilter pf;
public WorkerThread(BufferedImage source, BufferedImage toDraw, int x, int y, BufferedImage target){
this.source = source;
this.toDraw = toDraw;
this.x = x;
this.y = y;
this.target = target;
pf = new ParallelWatermarkFilter(source, 5);
}
@Override
public void run() {
pf.mergeImages(source, toDraw, x, y, target);
}
}

这就是我在FilterClass中使用 ExecutorService 的方式:

public BufferedImage apply(BufferedImage input) {
ExecutorService threadpool = Executors.newFixedThreadPool(numThreads);
for (int w = 0; w < imgWidth; w += watermarkWidth) {
for (int h = 0; h < imgHeight; h += watermarkHeight) {
Runnable worker = new WorkerThread(input, watermark, w, h, result);
System.out.println("WIDTH: " + w + "   HEIGHT: " + h);
threadpool.execute(worker);
}
}
threadpool.shutdown();

线程不等到一个线程完成吗?

ThreadPoolExecutor关闭和任务执行/清空工作队列/从工作队列中取出的事情是不雅的。因此,您不能依赖线程中断机制或其他机制。您得到的保证是:

启动有序关机,其中以前提交的任务是 已执行,但不接受任何新任务。调用没有 如果已关闭,则会产生附加效果。

此方法不等待以前提交的任务完成 执行。

为了更深入地了解ThreadPoolExecutor实现,让我们看一下主要的执行方法:

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted.  This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

这里的关键部分是呼唤getTask()。它的片段是:

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

该方法不同步,仅依赖于 CAS 提供的ctl值提供的排序。ctl这里是存储在AtomicInteger中的全局池状态(用于非阻塞原子ThreadPoolExecutor状态获取(。

因此,以下情况是可能的。

  1. 名为getTask的工作线程
  2. 获取池的工作线程运行状态。它仍然RUNNING.
  3. 另一个线程启动了订单关闭并相应地修改ctl
  4. 工作
  5. 线程已从工作队列中获取任务。

最新更新