监视对象池工作者的状态



我从另一个类中创建了这个线程,用于在完成时读取执行器的状态,并在失败时取消其余任务。可运行任务

如果看到任何失败,则总体状态必须为1或失败

final CompletionService completionService = new ExecutorCompletionService(getExecutorService());
final List<Future> futures = new ArrayList<Future>();
    FutureTask<Integer> tasks = new FutureTask<Integer>(new Callable<Integer>() {
        public Integer call() {
            int status = 0;
            boolean fail = false;
            try {
                for (int i = 0; i < 10; i++) {
                    MyRunnable resultObj = null;
                    try {
                        resultObj = (MyRunnable) completionService.take().get();
                    } catch (CancellationException e) {
                        // Skip it ..
                    }
                    if (!fail) {
                        status = resultObj.getStatus();
                        if (status == 1) {
                            fail = true;
                            for (Future future : futures) {
                                if (!future.isCancelled() && !future.isDone())
                                    future.cancel(true); // cancel pending tasks including running tasks 
                            }
                        }
                    }
                }
            } catch (Exception e) {
                 e.printStackTrace();
            }
            return status;
        }
            });

以上线程已启动-

ExecutorService pool = Executors.newSingleThreadExecutor();
pool.submit(tasks);

下面,Object是从池中借用的,它是一个阻塞调用,我将池大小设置为3因此,最初立即创建了3个MyRunnable工人。当每个worker完成后,它们被重用以服务于其他任务。

for (int i = 0 ; i < 10; i ++;) {
    MyRunnable myRunnable = null;
    myRunnable = (MyRunnable) this.getGenericObjectPool().borrowObject();
    set myRunnable ..
    futures.add(completionService.submit(myRunnable, myRunnable));
}
while (!tasks.isDone()) {
        try {
            Thread.sleep(Global.WaitTime());            
        } catch (InterruptedException iex) {            
        }
}
finalStatus = tasks.get();
pool.shutdown();

GenericObjectPool配置用于重用对象。我在IDE中模拟了一个测试,强迫第一个线程失败并将其状态设置为1。但是,问题是一旦它被释放,它就被borrowObject()重用,并且监视线程看到状态被设置为0的更改对象,作为激活新对象的一部分,这是由GenricObjectPool完成的。

所以,我无法从失败的线程读取状态。MyRunnable是不可调用的,所以我不得不欺骗Runnable使用completionService.submit(obj,obj)

如果使池大小为10或更多,则不会发生此问题,因为没有对象将被重用,并且我将成功读取每个对象的状态,但这不是一个选项。

我为Runnable创建了一个CallableDecorator来解决这个问题。现在我有适当的返回值,即使使用GenericObjectPool。由于现在不依赖Pool对象来读取状态,即使重用对象也不会导致状态重置-

那么,代码中的2个变化- Change

futures.add(completionService.submit(myRunnable, myRunnable));

futures.add(completionService.submit(new CallableDecorator(myRunnable)));

添加一个新类

public class CallableDecorator implements Callable {
       IRunnable r;
       public CallableDecorator(IRunnable r) {
           this.r = r;
       }
       public Integer call() {
           r.run();
           return r.statusCode();
       }
}
interface IRunnable extends Runnable {
     public Integer statusCode();
}

同样,为了在监视线程中获取其值,必须将resultObj更改为整数。

最新更新