线程池被拒绝执行处理程序它是如何工作的



我的目标是在处理大文件时限制内存使用量。为此,我正在使用一个线程池实现,它应该使其无法从文件中加载更多数据,然后在给定时间进行处理。

try (CSVParser parser = new CSVParser(new File("...."))) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 5, TimeUnit.MINUTES, new ArrayBlockingQueue<>(1), new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            r.run();
        }
    });
    for (Item item; (item = parser.nextItem()) != null;) {
        executor.submit(new ItemsProcessor(item));
    }
    executor.shutdown();
    executor.awaitTermination(12, TimeUnit.HOURS);
} catch (Exception e) {
    e.printStackTrace();
}

我的理解是RejectedExecutionHandlerrejectedExecution方法将在主线程上运行,即创建ThreadPoolExecutor的线程。是吗?

被拒绝的任务是否在创建线程池的同一线程上运行?

据我了解,这种方法最多只能在内存中加载 12 个项目。 线程池正在处理 10 个,一个在线程池的队列中,另一个被拒绝,它与循环在同一线程上运行(暂停循环(。

你是对的,RejectedExecutionHander 在主线程中运行。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestRejectedExecution
{
    public static void main( String[] args )
    {
        Runnable r = () -> {
            Thread cur = Thread.currentThread();
            System.out.println( String.format( "in runnable, thread id: %s, name: %s, group name %s",
                                               cur.getId(), cur.getName(), cur.getThreadGroup().getName() ) );
            try
            {
                Thread.sleep( 5000 );
            }
            catch ( InterruptedException e )
            {
                e.printStackTrace();
            }
        };
        Thread cur = Thread.currentThread();
        System.out.println( String.format( "in main, thread id: %s, name: %s, group name %s",
                                           cur.getId(), cur.getName(), cur.getThreadGroup().getName() ) );
        try {
            ThreadPoolExecutor executor = new ThreadPoolExecutor( 1, 1, 0, TimeUnit.MINUTES, new ArrayBlockingQueue<>( 2),
                                                                  ( r1, executor1 ) -> {
                                                                      Thread cur1 = Thread.currentThread();
                                                                      System.out.println( String.format( "in REH, thread id: %s, name: %s, group name %s",
                                                                                                         cur1.getId(), cur1
                                                                                                             .getName(), cur1
                                                                                                             .getThreadGroup().getName() ) );
                                                                  } );
            for (int i=0; i<5; i++ ) {
                executor.submit( r );
            }
            executor.shutdown();
            executor.awaitTermination(1, TimeUnit.MINUTES);
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }
}

这是输出:

in main, thread id: 1, name: main, group name main
in REH, thread id: 1, name: main, group name main
in REH, thread id: 1, name: main, group name main
in runnable, thread id: 11, name: pool-1-thread-1, group name main
in runnable, thread id: 11, name: pool-1-thread-1, group name main
in runnable, thread id: 11, name: pool-1-thread-1, group name main
Process finished with exit code 0

RejectExecutionHandler 是在第一个执行程序停止工作的情况下提供的工具。

如果第一个执行程序拒绝执行线程,则将调用 RejectExecutionHandler。

    // Create Executor for Normal Execution
    public static ThreadPoolExecutor executor=(ThreadPoolExecutor) Executors.newFixedThreadPool(10);
    // Create Executor for Alternate Execution in case of first Executor shut down
    public static ThreadPoolExecutor alternateExecutor=(ThreadPoolExecutor) Executors.newFixedThreadPool(10);
/ Create Rejected ExecutionHandler Class override rejectedExecution method
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable worker, ThreadPoolExecutor executor) {
        // TODO Auto-generated method stub
        System.out.println(worker.toString()+" is Rejected");
        System.out.println("Retrying to Execute");
        try{
            //Re-executing with alternateExecutor
            RejectedExecutionHandlerExample.alternateExecutor.execute(worker);
            System.out.println(worker.toString()+" Execution Started");
        }
        catch(Exception e)
        {
            System.out.println("Failure to Re-exicute "+e.getMessage());
        }
    }
}

// Register RejectedExecutionHandler in Main Class
RejectedExecutionHandler handler=new MyRejectedExecutionHandler();
executor.setRejectedExecutionHandler(handler);