终止多租户的线程池中的线程



我正在为作业创建一个线程池,如下所示。

public class MoveToCherwellThreadPool {
public static ThreadPoolExecutor cherwellMoveThreadPoolExecutor = null;
private static EMLogger logger = EMLogger.getLogger();
private static final String CLASSNAME = "MoveToCherwellThreadPool";
public static void initiateCherwellMoveThreadPool() {
BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(100000);
cherwellMoveThreadPoolExecutor = new ThreadPoolExecutor(10,20, 20, TimeUnit.SECONDS, q);
cherwellMoveThreadPoolExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor) {
logger.logDebug(CLASSNAME,"Rejected task cherwellMoveThreadPoolExecutor Active tasks : " + cherwellMoveThreadPoolExecutor.getActiveCount() + ", " + "cherwellMoveThreadPoolExecutor Completed tasks : " + cherwellMoveThreadPoolExecutor.getCompletedTaskCount()+" Waiting for a second !! ");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(r);
}
});
}

}

我在为多个客户运行的过程中使用了这个。对于每个客户,将初始化新的线程池,并运行线程。下面是我使用线程池的代码。

for (Object[] objects : relationshipList) {
CherwellRelationshipMoveThread relationshipThread = new CherwellRelationshipMoveThread(objects,
this.customerId, sb, credential,mainCIId,moveUniqueId,this.startTime);
CompletableFuture<?> future = CompletableFuture.runAsync(relationshipThread,
MoveToCherwellThreadPool.cherwellMoveThreadPoolExecutor);
crelationshipList.add(future);
}
crelationshipList.forEach(CompletableFuture::join);

此线程将为多个客户创建。我在UI中提供了一个终止此作业的选项。在点击停止进程时,我只需要停止/终止为该特定客户运行的线程,其他客户的线程不应该受到伤害,应该继续运行。

在UI中点击停止过程时,我正在调用一个服务,在该服务中,我的代码将是

MoveToCherwellThreadPool.cherwellMoveThreadPoolExecutor.shutdownNow();

我正在ThreadPoolExecutor上调用shutdownNow((。

这正在扼杀所有客户的所有线索。我不想杀死所有的客户流程,但只为客户点击停止流程。

此代码不维护从租户到线程池的任何映射,只有一个对ThreadPoolExecutor的静态引用。每次调用initiateCherwellMoveThreadPool时,任何现有的执行器都会被一个新的执行器替换,并且现有的执行程序不会关闭,因此它会泄漏资源。因此,这将执行来自同一线程池中多个租户的任务。

此代码也不是线程安全的。线程有可能(如果不太可能的话(在调用setRejectedExecutionHandler之前,在新创建的执行器上调度任务,甚至关闭它。

如果您需要每个租户有一个单独的执行人,则需要实现这一点。例如,一个好的选择可能是使用带有customerId密钥和ThreadPoolExecutor值的ConcurrentHashMap(为简洁起见,省略了日志记录(:

public class MoveToCherwellThreadPool {
public static ConcurrentMap<String, ThreadPoolExecutor> cherwellMoveThreadPoolExecutors = new ConcurrentHashMap<>();
public static ThreadPoolExecutor getCherwellMoveThreadPool(String customerId) {
return cherwellMoveThreadPoolExecutors.computeIfAbsent(customerId, id -> {
BlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(100000);
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 20, 20, TimeUnit.SECONDS, q);
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { /*...*/ });
return executor;
});
}
public static List<Runnable> stopCherwellMoveTheadPool(String customerId) {
if (cherwellMoveThreadPoolExecutors.containsKey(customerId)) {
return cherwellMoveThreadPoolExecutors.get(customerId).shutdownNow();
}
return Collections.emptyList();
}
}

这可以这样使用:

CompletableFuture<?> future = CompletableFuture.runAsync(relationshipThread,
MoveToCherwellThreadPool.getCherwellMoveThreadPool(customerId));

同样重要的是要意识到,调用shutdownNow只能尝试取消当前执行的任务,并且";不等待主动执行任务终止":

此实现通过Thread.interrupt()取消任务,因此任何未能响应中断的任务都可能永远不会终止。

实现CherwellRelationshipMoveThread的代码没有显示,所以情况可能是这样,也可能不是这样。