ThreadPoolExecutor正在运行的应用程序



计费类具有与国家/地区计费相关的所有逻辑。它从数据库获取结果,然后向用户收费。Billing类实现Runnable。我想根据国家参数并行执行计费,以便大量用户(500万+)计费非常快。现在要花好几个小时才能完成。

我正试图实现ThreadPoolExecutor执行计费类,但我很困惑如何??下面的区别是什么?还是我做错了什么?请建议! !总共有20个国家,但我在这里只粘贴了5个。

 //for 20 countries  ThreadPoolExecutor (20,20,20.......)????
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS, 
new ArrayBlockingQueue<Runnable>(10), new ThreadPoolExecutor.CallerRunsPolicy());
executor.execute(new Billing("UK")); 
executor.execute(new Billing("USA")); 
executor.execute(new Billing("Germany")); 
executor.execute(new Billing("Spain")); 
executor.execute(new Billing("Italy")); 

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS, 
new ArrayBlockingQueue<Runnable>(10), new ThreadPoolExecutor.CallerRunsPolicy());
for(int i=0;i<5;i++) // for 20 countries   i<20??
{    
   executor.execute(new Billing("UK")); 
   executor.execute(new Billing("USA")); 
   executor.execute(new Billing("Germany")); 
   executor.execute(new Billing("Spain")); 
   executor.execute(new Billing("Italy")); 
}
while (! executor.isTerminated()) {
   try{
      executor.awaitTermination(100, TimeUnit.SECONDS);
   }catch(InterruptedException iE)
   {
      iE.printStackTrace();
      System.out.println("Executor Exception: "+ iE);
   }

提前感谢!!

循环解决方案似乎不对。不需要多次执行相同的Runnable

您正在实例化ThreadPoolExecutor,将corePoolSizemaximumPoolSize设置为5,这意味着执行器将维护池中的线程数为5,即使它们是空闲的。它还说池中不能有超过5个线程。

有了这个设置,您可以预期最多有5个线程并行运行,执行任务(Billing对象)。

当您继续使用execute方法向executor提交Billing对象时,它们将被添加到您提供的ArrayBlockingQueue中。这个队列的大小是10。有可能在某些实例队列已经达到最大容量,不能接受更多的任务,在这种情况下,任务被拒绝并交给ThreadPoolExecutor构造函数中提供的RejectedExecutionHandler。它的工作是使用实现的方法rejectedExecution处理被拒绝的任务。

如果您想要查找是否有任何被拒绝的任务,您必须提供自己的RejectedExecutionHandler而不是使用默认的ThreadPoolExecutor.CallerRunsPolicy。你可以这样做:

ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5,
        TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
        new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r,
                    ThreadPoolExecutor executor) {
                System.out.println("I got rejected: " + r);
                if (!executor.isShutdown()) {
                    r.run();
                }
            }
        });

首先:忘记循环

for(int i=0;i<5;i++) // for 20 countries   i<20??
{    
   executor.execute(new Billing("UK")); 
   executor.execute(new Billing("USA")); 
   executor.execute(new Billing("Germany")); 
   executor.execute(new Billing("Spain")); 
   executor.execute(new Billing("Italy")); 
}

this循环遍历所有账单。

正确的做法是在第一个代码片段中:
executor.execute(new Billing("UK")); 
executor.execute(new Billing("USA")); 
executor.execute(new Billing("Germany")); 
executor.execute(new Billing("Spain")); 
executor.execute(new Billing("Italy")); 

另一个错误位于终止检查:

while (! executor.isTerminated()) {
   try{
      executor.awaitTermination(100, TimeUnit.SECONDS);
   }catch(InterruptedException iE)
   {
      iE.printStackTrace();
      System.out.println("Executor Exception: "+ iE);
   }
}

Executor.awaitTermination的javadoc说:

在关机请求后阻塞直到所有任务完成执行,

但是你从来没有发出一个关机请求。

在您的情况下,您可以利用ExecutorCompletionService,如:

CompletionService<String> ecs = new ExecutorCompletionService<String>(executor);
List<String> countries= Arrays.asList("UK","USA","Germany","Spain","Italy");   
for(String country : countries) {
    ecs.submit(new Billing(country),country);
}
// wait for completion
for(int i=0;i<countries.size();i++){
      ecs.take(); // wait for next country completion
}
// all work completed, shutdown
executor.shutdownNow();

我不确定你是否理解循环是如何工作的。不同之处在于,第二段代码将在每个列出的国家/地区运行5次计费。

假设您正在谈论代码的for循环部分,它如何工作并不明显。

理想的循环应该是这样的:

for(String country : countryCollection) {
    executor.execute(new Billing(country));
}

您考虑过使用enum吗?

static class Billing implements Runnable {
  enum Country {
    UK,
    USA,
    Germany,
    Spain,
    Italy;
  }
  public Billing(Country country) {
  }
  @Override
  public void run() {
  }
}
public void test() {
  ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS,
       new ArrayBlockingQueue<Runnable>(10), 
          new ThreadPoolExecutor.CallerRunsPolicy());
  for ( Billing.Country country : Billing.Country.values() ) {
    executor.execute(new Billing(country));
  }
}

考虑实现这一点的另一种方法是看一下Fork/Join框架。这似乎真的可以从工作磨练中受益。例如,你可以把它拆分得相当干净。这将允许您分解用户或用户子集的计费任务,而不是让一个线程看起来代表一个国家的所有计费工作。

如果你使用的是Java <7

相关内容

  • 没有找到相关文章

最新更新