我想验证我编写的多线程应用程序的设计,并在几点上得到澄清/重新保证。我提前为这么长的帖子道歉——我本想把它分成几个问题,但后来我不得不引用同一个代码,它们似乎都是相互关联的,所以我选择把所有内容都放在一个帖子里。如果这不合适,请告诉我,我会把它分成多个帖子。
这是我所拥有的:
- BatchService(Spring Singleton bean):接受上传指定目录或zip存档的请求。为此,它拥有ExecutorService服务池。在每个请求中,它都会向池提交一个新的BatchUploader Callable任务,并将返回的Future存储在一个列表中——一个TX方法。它提供了获取所有上传状态和取消所有上传的方法。它还启动一个新的BatchMonitor线程来监控上传的进度,并更新保存已完成和未完成上传信息的队列。它还在bean即将被销毁时清理所有资源(使用Spring的PreDestroy回调)
- BatchUploader是一个可调用的任务,它也有自己的ServiceExecutor批处理池来上传单个文件。在其call()方法中,它扫描目录或zip存档,并为每个文件向其池提交一个SingleFileUploader可调用任务
- SingleFileUploader是一个可调用的任务,在其call()方法中,它完成上传和处理文件的所有工作,并返回一些状态
这里有一些真实的和伪的代码:
public class BatchService {
private ExecutorService servicePool;
private ConcurrentHashMap<String, Future<SingleBatchUploadResult>> uploadBatchFutures = new ConcurrentHashMap<String, Future<SingleBatchUploadResult>>();
// keep last 100 unsuccessful uploads
private ConcurrentLinkedQueue<SingleBatchUploadResult> notCompletedBatches = new ConcurrentLinkedQueue<SingleBatchUploadResult>();
// keep last 100 successful uploads
private ConcurrentLinkedQueue<String> completedBatches = new ConcurrentLinkedQueue<String>();
private Thread monitorThread;
public BatchService() {
executorService = Executors.newFixedThreadPool(MAX_BATCH_UPLOAD_THREADS);
monitorThread = new Thread(new BatchMonitor());
monitorThread.setDaemon(true);
monitorThread.start();
}
@Transactional
public void processUpload(String uploadId, String contentName) {
Future<SingleBatchUploadResult> taskFuture = servicePool.submit(new BatchUploader(uploadId, contentName));
uploadBatchFutures.put(uploadId, taskFuture);
}
@PreDestroy
public void preDestroy() {
// stop the monitor thread
monitorThread.interrupt();
// stop all executors and their threads
cancelAllTasks();
}
public void cancelAllTasks(){
List<Runnable> waitingTasks = servicePool.shutdownNow();
for (Runnable task: waitingTasks){
// examine which tasks are still waiting, if necessary
}
}
public boolean cancelBatchById(String uploadId){
Future<SingleBatchUploadResult> resultFuture = activeBatchFutures.get(uploadId);
if (resultFuture != null && (!resultFuture.isDone() || !resultFuture.isCancelled()) ){
resultFuture.cancel(true);
return true;
}
// this task was either already finished, cancelled, not submitted or unknown
return false;
}
public void getCurrentStatus(){
// just print out the sizes of queues for now
System.out.println("number of active uploads: " + activeBatchFutures.size());
System.out.println("number of successfully completed uploads: " + completedBatches.size());
System.out.println("number of failed uploads: " + notCompletedBatches.size());
}
public class BatchMonitor implements Runnable {
@Override
public void run() {
boolean cont = true;
while (cont) {
if (Thread.currentThread().isInterrupted()){
// the thread is being shut down - get out
cont = false;
break;
}
Iterator<Entry<String, Future<SingleBatchUploadResult>>> iterator = activeBatchFutures.entrySet().iterator();
// remove completed Futures from the map
// add successfully completed batches to completedBatches queue
// add all other batches to notCompletedBatches queue
while (iterator.hasNext() && cont){
…
if (batchUploadFuture.isCancelled()) {
addToNotCompleted(defaultResult);
// remove this future from the active list
activeBatchFutures.remove(uploadId);
} else if (batchUploadFuture.isDone()){
try {
SingleBatchUploadResult result = batchUploadFuture.get();
if (UploadStatus.SUCCESS.equals(result.getUploadStatus()))
addToCompleted(uploadId);
else
addToNotCompleted(result);
} catch (InterruptedException e) {
// the thread is being shut down - stop processing
cont = false;
// preserve interruption state of the thread
Thread.currentThread().interrupt();
break;
} catch (ExecutionException e) {
addToNotCompleted(defaultResult);
}
// remove this future from the active list
activeBatchFutures.remove(uploadId);
} else {
// the task has not finished yet - let it be
// TODO if a Future is not complete - see how old it is [how ?] If older then timeout - cancel it
// For now, rely on the ExecutorService timeout set on the BatchUploader
}
}
// try to sleep for 5 sec, unless the thread is being shutdown
if (!Thread.currentThread().isInterrupted()){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
cont = false;
// preserve interruption state of the thread
Thread.currentThread().interrupt();
}
}
}
System.out.println("BatchMonitor.run() has terminated");
}
public void addToCompleted(String uploadId){
int currentSize = completedBatches.size();
// bring the size of the queue below MAX
if (currentSize > MAX_SUCCESSFUL_RESULTS) {
int delta = MAX_SUCCESSFUL_RESULTS - currentSize;
while (delta > 0){
completedBatches.poll();
delta--;
}
}
completedBatches.offer(uploadId);
}
public void addToNotCompleted(SingleBatchUploadResult result){
int currentSize = notCompletedBatches.size();
// bring the size of the queue below MAX
if (currentSize > MAX_UNSUCCESSFUL_RESULTS) {
int delta = MAX_UNSUCCESSFUL_RESULTS - currentSize;
while (delta > 0){
notCompletedBatches.poll();
delta--;
}
}
notCompletedBatches.offer(result);
}
}
}
public class BatchUploader implements Callable<SingleBatchUploadResult> {
private ExecutorService executorService;
// Map<fileName, Future result> - holds Futures for all files that were submitted for upload (those that did not fail validation)
private ConcurrentHashMap<String, Future<SingleFileUploadResult>> uploadTaskFutures = new ConcurrentHashMap<String, Future<SingleFileUploadResult>>();
private ConcurrentHashMap<String, SingleFileUploadResult> notUploadedFiles = new ConcurrentHashMap<String, SingleFileUploadResult>();
private int totalFilesToUpload = 0;
public BatchUploader(...) {
executorService = Executors.newFixedThreadPool(MAX_THREADS_PER_BATCH);
}
public SingleBatchUploadResult call() {
// do some validation
if ( this is a correct ZIP file){
String errorMessage = processZipArchive(threadName, contentName);
// the errorMessage will be not null if there were some exceptions that happened during the zip archive read:
// opening the ZIP archive, reading entries or thread interruption exceptions
if (errorMessage != null) {
...
return errorBatchUploadResult;
}
}
// all tasks are submitted - stop the service from accepting new requests and shutdown when done
executorService.shutdown();
// now wait until all tasks have finished - but only up to BATCH_UPLOAD_TIMEOUT_IN_SEC seconds
try {
executorService.awaitTermination(BATCH_UPLOAD_TIMEOUT_IN_SEC, TimeUnit.SECONDS);
} catch (InterruptedException e) {
// try to shutdown all running tasks and stop waiting tasks from being scheduled;
executorService.shutdownNow();
// preserve interruption state of the thread
Thread.currentThread().interrupt();
return errorBatchUploadResult;
}
// at this point, we either finished all tasks (awaitTermination finished before timeout),
// or we timed out waiting. Get the latest status of each task
List<String> successfullyUploadedFiles = new LinkedList<String>();
for (String entryName : uploadTaskFutures.keySet()) {
Future<SingleFileUploadResult> future = uploadTaskFutures.get(entryName);
try {
if (future.isCancelled()) {
...
notUploadedFiles.putIfAbsent(entryName, taskResult);
} else if (future.isDone()) {
// this task has finished
taskResult = future.get();
if (taskResult.getUploadStatus().equals(UploadStatus.SUCCESS))
successfullyUploadedFiles.add(entryName);
else
notUploadedFiles.putIfAbsent(entryName, taskResult);
} else {
// this task is either not started yet or not finished yet
…
notUploadedFiles.putIfAbsent(entryName, sometaskResult);
}
} catch (InterruptedException e){
// this is a signal to stop processing
batchUploadResult.setTotalFilesToUpload(totalFilesToUpload);
batchUploadResult.setNotUploadedFiles(notUploadedFiles);
batchUploadResult.setSuccessfullyUploadedFiles(successfullyUploadedFiles);
batchUploadResult.setStatusMessage(statusMessage);
batchUploadResult.setUploadStatus(UploadStatus.PARTIAL_FAILURE);
// cancel/stop all executing/waiting SingleFileUpload tasks
executorService.shutdownNow();
// preserve interruption state of the thread
Thread.currentThread().interrupt();
return batchUploadResult;
} catch (ExecutionException e) {
// we do not know what the state of this task is
…
notUploadedFiles.putIfAbsent(entryName, sometaskResult);
}
}
...
return batchUploadResult;
}
private String processZipArchive(String threadName, String zipName) {
// do all ZIP-reading work here
while ( valid file found )
{
if (Thread.currentThread().isInterrupted()){
// this batch uploader thread is being shut down - stop all SingleFileUpload tasks
executorService.shutdownNow();
return errorMessage;
}
// do a try while processing individual files to be able to gather info about failed files but continue processing good ones
try {
// read the file and pass it for processing to SingleFileUploader
Future<SingleFileUploadResult> taskFuture = executorService.submit(new SingleFileUploader(uploadId, bytesContent, zipEntryName));
uploadTaskFutures.put(zipEntryName, taskFuture);
...
} catch (some exceptions) {
notUploadedFiles.put(zipEntryName, taskResult);
}
}
return errorMessage;
}
}
public class SingleFileUploader implements Callable<SingleFileUploadResult> {
...
@Override
public SingleFileUploadResult call() {
// check if there was a cancellation request
if (Thread.currentThread().isInterrupted()){
// this file uploader thread is being shut down - get out
return errorResult;
}
// do the real work here
return result;
}
}
所有这些在常规场景中都可以正常工作。然而,我仍然想听听你对是否有更好/更可靠的方法来做我想做的事情的看法,特别是在以下领域:
我正在使用一个单独的线程BatchMonitor,通过定期扫描活动Futures的列表并将其移动到"成功完成"或"未完成[失败]"队列中,来跟踪哪些是活动的、已完成的和尚未完成的。我想知道是否有更好的方法?
为此,我使用同步的无边界队列,并在不断向它们添加项目时,将它们绑定到指定的最大大小。我在标准JDK libs中找不到"有界并发队列",只有无界队列,我希望我可以使用Guava的EvictingQueue,但它被捆绑到15.0版本中,似乎还没有发布…所以,我决定自己限制队列的大小,代价是使用size()操作,我知道这是并发队列的一个问题,因为它会对队列进行全面扫描……我的理由是,如果我将队列的大小保持在较小的范围内(在我的情况下为),这可能是可以的
我是否需要并发队列?唯一修改队列的线程是BatchMonitor线程,唯一读取队列的其他线程是BatchService线程。我唯一能陷入不同步的情况是当BatchSEervice试图获取特定上传的状态时。上传可能已经从activeBatchFutures映射中删除,但尚未放入"已完成"或"未完成"队列,因为我没有故意同步映射和队列之间的读/写,以避免不必要的锁定。但我可以离开,偶尔会为某个特定的上传返回"未找到"状态——第二次询问状态会得到正确的结果。
BatchService是一个Singleton bean,它带来了自己的可伸缩性问题,因为对该bean的所有请求都将被抑制。另一种选择是让每个BatchUploader都成为一个Spring bean并限制bean的数量,但我该如何进行整体监控呢?
处理超时和取消:我正在努力让这个应用程序在资源清理方面无懈可击——我正在努力处理所有ThreadEnterprise案例,并停止处理以允许线程被杀死。我依靠InterruptedException在BAtchUploader中被捕获和处理,通过调用batchPool.shutdownNow()将此事件传播到各个FileUploader任务。你能看到我可能有失控线程的任何潜在情况吗?当JVM关闭时,应用程序被重新部署在Web容器中?
谢谢!
Marina
-
使用Guava的ListenableFuture而不是
BatchMonitor
-ListenableFuture
可以在Future
完成后立即执行回调,这就不需要使用线程来监视Futures
。 -
使用ArrayBlockingQueue,这是一个有界并发队列。如果队列为空,则在使用者线程中使用
take
来移除项目和块;如果队列已满,则在生产者线程中使用offer(E e, long timeout, TimeUnit unit)
来添加项目和块(对于timeout units
)。 -
如果使用
ListenableFutures
,则不需要BatchMonitor
或并发队列 -
我建议您在
for (String entryName : uploadTaskFutures.keySet())
循环的每次迭代中检查Thread.currentThread().isInterrupted()
,因为您没有调用在所有代码路径上抛出InterruptedException
的方法(例如,如果您一直通过else
路径,那么可能需要一段时间才能注意到已设置中断标志)