我是我的代码,我将一些任务提交给ExecutorService,然后使用shutdown((和awaitTermination((等待它们完成。但是,如果任何一个任务需要超过一定时间才能完成,我希望在不影响其他任务的情况下取消它。我使用来自 ExecutorService 的代码修正代码,该代码在超时后中断任务,如下所示:
package com.jthink.jaikoz.memory;
import com.jthink.jaikoz.MainWindow;
import java.util.List;
import java.util.concurrent.*;
public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
private final long timeout;
private final TimeUnit timeoutUnit;
private boolean isShutdown = false;
private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
//Map Task to the Timeout Task that could be used to interrupt it
private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();
public long getTimeout()
{
return timeout;
}
public TimeUnit getTimeoutUnit()
{
return timeoutUnit;
}
public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
{
super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}
@Override
public void shutdown() {
isShutdown = true;
super.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
timeoutExecutor.shutdownNow();
return super.shutdownNow();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if(timeout > 0) {
//Schedule a task to interrupt the thread that is running the task after time timeout
final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
//Add Mapping
runningTasks.put(r, scheduled);
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//Remove mapping and cancel timeout task
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
if (isShutdown)
{
if(getQueue().isEmpty())
{
//Queue is empty so all tasks either finished or currently running
MainWindow.logger.severe("---Thread Pool Queue is Empty");
timeoutExecutor.shutdown();
}
}
}
/**
* Interrupt the thread
*
*/
class TimeoutTask implements Runnable {
private final Thread thread;
public TimeoutTask(Thread thread) {
this.thread = thread;
}
@Override
public void run() {
MainWindow.logger.severe("Cancelling task because taking too long");
thread.interrupt();
}
}
}
以及任务何时有时间完成以及何时不能按预期工作的测试用例
package com.jthink.jaikoz;
import com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor;
import junit.framework.TestCase;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Created by Paul on 08/12/2014.
*/
public class TestThreadPool extends TestCase
{
public void testThreadPoolTasksComplete() throws Exception
{
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 6, TimeUnit.SECONDS);
for (int i = 0; i < 10; i++)
{
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(5000);
System.out.println("Done");
return null;
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");
}
public void testThreadPoolTasksCancelled() throws Exception
{
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);
for (int i = 0; i < 10; i++)
{
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(5000);
System.out.println("Done");
return null;
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
System.out.println("Program done");
}
}
在我的代码中似乎有效:
private boolean matchToRelease(ListMultimap<MatchKey, MetadataChangedWrapper> matchKeyToSongs)
throws JaikozException
{
if (stopTask)
{
MainWindow.logger.warning("Analyser stopped detected in matchToRelease");
return false;
}
TimeoutThreadPoolExecutor es = getExecutorService();
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(matchKeyToSongs.size());
for(MatchKey matchKey:matchKeyToSongs.keySet())
{
List<MetadataChangedWrapper> songs = matchKeyToSongs.get(matchKey);
futures.add(es.submit(new CorrectFromMusicBrainzWorker(this, stats, matchKey, songs)));
}
es.shutdown();
try
{
es.awaitTermination(matchKeyToSongs.keySet().size() * es.getTimeout(), es.getTimeoutUnit());
}
catch(InterruptedException ie)
{
MainWindow.logger.warning(this.getClass() + " has been interrupted");
return false;
}
return true;
}
然而,即使对于一个客户
---Thread Pool Queue is Empty
是输出 awaitTermination(( 不返回,只有在用户两小时后取消任务时才最终返回 - 完整日志提取在这里
14/12/2014 20.44.19:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzWorker:getSongsNotMatched:SEVERE: /Volumes/2TB External/New iTunes Library/iTunes Media/Music/XTC:albumMetadataMatchingCounts11:AlreadyMatched:2:ToMatch:11
14/12/2014 20.44.19:com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor:afterExecute:SEVERE: ---Thread Pool Queue is Empty
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.ExecutorServiceEnabledAnalyser:cancelTask:WARNING: Cancelling class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser Task
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser:matchToRelease:WARNING: class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser has been interrupted
那么,即使日志显示队列为空,因此在执行器本身和嵌入式超时执行器上都调用了 shutdown((,awaiTermination(( 怎么可能不返回呢?
我自己对此有一些想法,但不知道答案。
首先,为什么实际上有必要关闭 TimeOutExecutor 以便 awaitTermination(( 返回。在我的子类中,awaitTermination((没有被覆盖,所以如果所有任务都已完成,TiumeOutExecutor(awaitTermination((不知道是否关闭,这有什么关系(
其次,为什么---线程池队列为空有时会多次获得输出
超时执行器是单线程的,这是正确的/必要的吗?
基于霍尔格斯答案的更新
所以你遇到的问题是你正在关闭 timeoutExecutor 太早了,因此它可能会错过一个或多个 用于中断线程池执行程序的挂起任务的任务。
我现在看到空队列只是意味着所有任务都已完成或已开始。(抱歉,我的示例测试以前具有误导性,它运行了 10 多个临时编辑任务,并且在生产代码中,工作线程的数量基于用户计算机上的 CPU 数量(。
所以你是说我过早地关闭((timeoutExecutor(可能有高达WorkerSize -1的任务仍在运行(,这意味着所有仍在为尚未完成的任务运行的timeoutExecutor都被中断了。因此,如果剩余的其中任何一个由于某种原因未自行完成,则它们的超时任务将不再存在,因此不能用于中断它们。但是 awaitTermination(( woiuldnt 返回的唯一原因是,如果这些最后 (WorkerSize -1( 任务之一没有完成。
我自愿将之前执行(( 更改为
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
if (isShutdown)
{
if(getQueue().isEmpty())
{
if(runningTasks.size()==0)
{
this.shutdownNow();
}
}
}
}
为了确保它会完成,我使用了shutdownNow((,但直到一切都完成,但根据您的评论,这仍然可能无法正常工作
我应该做
protected void afterExecute(Runnable r, Throwable t) {
ScheduledFuture timeoutTask = runningTasks.remove(r);
if(timeoutTask != null) {
timeoutTask.cancel(false);
}
}
和
protected void terminated()
{
timeoutExecutor.shutdown();
}
并且一旦提交的所有任务都完成(自然或通过相应的超时执行器取消(就会调用 terminated(( 此时超时执行器仍然存在并不重要?
对于修改我的测试用例的完整性,除非超时任务正常工作,否则任务将花费很长时间,显示原始解决方案失败(挂起(并且修订后的解决方案正常工作
public void testThreadPoolTasksCancelled() throws Exception
{
Instant t1, t2;
t1 = Instant.now();
final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);
for (int i = 0; i < 50; i++)
{
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
Thread.sleep(500000000);
System.out.println("Done");
return null;
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
t2 = Instant.now();
System.out.println("Program done:"+(Duration.between(t1, t2).toMillis()/ 1000+ " seconds"));
}
队列仅包含尚未启动的作业。具有空队列并不意味着没有挂起的作业;它们可能只是为了被执行而被删除。特别是在您的示例代码中,假设空队列意味着没有正在运行的作业是致命的错误;由于已将执行程序配置为具有 10 个核心线程并提交 10 个作业,因此在示例代码的整个执行过程中,队列将始终为空。
因此,您遇到的问题是过早关闭timeoutExecutor
,因此它可能会错过一个或多个任务来中断线程池执行器的待处理任务。
请注意,原则上,作业甚至可能处于从队列中删除的状态(如果已添加(,但尚未调用beforeExecute
。因此,即使有一个空队列和一个空的runningTasks
映射也不能保证没有待处理的作业。
要回答您的另一个问题,您必须关闭timeoutExecutor
,因为它有一个关联的活动线程,该线程将始终使执行器保持活动状态。因此,不关闭它会产生内存泄漏并进一步保持线程活动状态,因此始终防止自动关闭 JVM。
但是,关闭timeoutExecutor
的正确位置是覆盖方法protected void terminated()
,该方法完全用于清理。
对于最后一个项目符号,您的timeoutExecutor
有多少线程并不重要,但考虑到任务的简单性,拥有多个线程没有任何好处,单线程执行器是最简单且可能最有效的解决方案。