我有一个宽度为 10 的固定线程池ExecutorService
,以及一个包含 100 个Callable
的列表,每个等待 20 秒并记录它们的中断。
我在单独的线程中调用该列表中的invokeAll
,并且几乎立即中断了该线程。 ExecutorService
执行按预期中断,但Callable
记录的实际中断数远远超过预期的 10 - 大约 20-40。如果ExecutorService
可以同时执行不超过 10 个线程,为什么会这样?
完整源代码:(由于并发性,您可能需要多次运行它(
@Test
public void interrupt3() throws Exception{
int callableNum = 100;
int executorThreadNum = 10;
final AtomicInteger interruptCounter = new AtomicInteger(0);
final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum);
final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>();
for (int i = 0; i < callableNum; ++i) {
executeds.add(new Waiter(interruptCounter));
}
Thread watcher = new Thread(new Runnable() {
@Override
public void run(){
try {
executorService.invokeAll(executeds);
} catch(InterruptedException ex) {
// NOOP
}
}
});
watcher.start();
Thread.sleep(200);
watcher.interrupt();
Thread.sleep(200);
assertEquals(10, interruptCounter.get());
}
// This class just waits for 20 seconds, recording it's interrupts
private class Waiter implements Callable <Object> {
private AtomicInteger interruptCounter;
public Waiter(AtomicInteger interruptCounter){
this.interruptCounter = interruptCounter;
}
@Override
public Object call() throws Exception{
try {
Thread.sleep(20000);
} catch(InterruptedException ex) {
interruptCounter.getAndIncrement();
}
return null;
}
}
使用 WinXP 32 位、Oracle JRE 1.6.0_27 和 JUnit4
我不同意你只应该收到 10 个中断的假设。
Assume the CPU has 1 core.
1. Main thread starts Watcher and sleeps
2. Watcher starts and adds 100 Waiters then blocks
3. Waiter 1-10 start and sleep in sequence
4. Main wakes and interrupts Watcher then sleeps
5. Watcher cancels Waiter 1-5 then is yielded by the OS (now we have 5 interrupts)
6. Waiter 11-13 start and sleep
7. Watcher cancels Waiter 6-20 then is yielded by the OS (now we have 13 interrupts)
8. Waiter 14-20 are "started" resulting in a no-op
9. Waiter 21-24 start and sleep
....
从本质上讲,我的论点是,不能保证观察者线程将被允许取消所有 100 个"服务员"RunnableFuture 实例,然后它必须产生时间片并允许 ExecutorService 的工作线程启动更多 Waiter 任务。
更新:显示来自AbstractExecutorService
的代码
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
f.get(); //If interrupted, this is where the InterruptedException will be thrown from
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads
}
}
包含 f.cancel(true)
的 finally 块是中断传播到当前正在运行的任务的时间。如您所见,这是一个紧密循环,但不能保证执行循环的线程能够在一个时间片中循环访问所有Future
实例。
如果你想实现相同的行为
ArrayList<Runnable> runnables = new ArrayList<Runnable>();
executorService.getQueue().drainTo(runnables);
在中断线程池之前添加此块。
它会将所有等待队列排空到新列表中。
因此,它将仅中断正在运行的线程。
PowerMock.mockStatic ( Executors.class );
EasyMock.expect ( Executors.newFixedThreadPool ( 9 ) ).andReturn ( executorService );
Future<MyObject> callableMock = (Future<MyObject>)
EasyMock.createMock ( Future.class );
EasyMock.expect ( callableMock.get ( EasyMock.anyLong (), EasyMock.isA ( TimeUnit.class ) ) ).andReturn ( ccs ).anyTimes ();
List<Future<MyObject>> futures = new ArrayList<Future<MyObject>> ();
futures.add ( callableMock );
EasyMock.expect ( executorService.invokeAll ( EasyMock.isA ( List.class ) ) ).andReturn ( futures ).anyTimes ();
executorService.shutdown ();
EasyMock.expectLastCall ().anyTimes ();
EasyMock.expect ( mock.getMethodCall ( ) ).andReturn ( result ).anyTimes ();
PowerMock.replayAll ();
EasyMock.replay ( callableMock, executorService, mock );
Assert.assertEquals ( " ", answer.get ( 0 ) );
PowerMock.verifyAll ();