计费类具有与国家/地区计费相关的所有逻辑。它从数据库获取结果,然后向用户收费。Billing类实现Runnable。我想根据国家参数并行执行计费,以便大量用户(500万+)计费非常快。现在要花好几个小时才能完成。
我正试图实现ThreadPoolExecutor执行计费类,但我很困惑如何??下面的区别是什么?还是我做错了什么?请建议! !总共有20个国家,但我在这里只粘贴了5个。
//for 20 countries ThreadPoolExecutor (20,20,20.......)????
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10), new ThreadPoolExecutor.CallerRunsPolicy());
executor.execute(new Billing("UK"));
executor.execute(new Billing("USA"));
executor.execute(new Billing("Germany"));
executor.execute(new Billing("Spain"));
executor.execute(new Billing("Italy"));
或
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10), new ThreadPoolExecutor.CallerRunsPolicy());
for(int i=0;i<5;i++) // for 20 countries i<20??
{
executor.execute(new Billing("UK"));
executor.execute(new Billing("USA"));
executor.execute(new Billing("Germany"));
executor.execute(new Billing("Spain"));
executor.execute(new Billing("Italy"));
}
while (! executor.isTerminated()) {
try{
executor.awaitTermination(100, TimeUnit.SECONDS);
}catch(InterruptedException iE)
{
iE.printStackTrace();
System.out.println("Executor Exception: "+ iE);
}
提前感谢!!
循环解决方案似乎不对。不需要多次执行相同的Runnable
。
您正在实例化ThreadPoolExecutor
,将corePoolSize
和maximumPoolSize
设置为5
,这意味着执行器将维护池中的线程数为5
,即使它们是空闲的。它还说池中不能有超过5
个线程。
有了这个设置,您可以预期最多有5
个线程并行运行,执行任务(Billing
对象)。
当您继续使用execute
方法向executor
提交Billing
对象时,它们将被添加到您提供的ArrayBlockingQueue
中。这个队列的大小是10
。有可能在某些实例队列已经达到最大容量,不能接受更多的任务,在这种情况下,任务被拒绝并交给ThreadPoolExecutor
构造函数中提供的RejectedExecutionHandler
。它的工作是使用实现的方法rejectedExecution
处理被拒绝的任务。
如果您想要查找是否有任何被拒绝的任务,您必须提供自己的RejectedExecutionHandler
而不是使用默认的ThreadPoolExecutor.CallerRunsPolicy
。你可以这样做:
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r,
ThreadPoolExecutor executor) {
System.out.println("I got rejected: " + r);
if (!executor.isShutdown()) {
r.run();
}
}
});
首先:忘记循环
for(int i=0;i<5;i++) // for 20 countries i<20??
{
executor.execute(new Billing("UK"));
executor.execute(new Billing("USA"));
executor.execute(new Billing("Germany"));
executor.execute(new Billing("Spain"));
executor.execute(new Billing("Italy"));
}
this循环遍历所有账单。
正确的做法是在第一个代码片段中:executor.execute(new Billing("UK"));
executor.execute(new Billing("USA"));
executor.execute(new Billing("Germany"));
executor.execute(new Billing("Spain"));
executor.execute(new Billing("Italy"));
另一个错误位于终止检查:
while (! executor.isTerminated()) {
try{
executor.awaitTermination(100, TimeUnit.SECONDS);
}catch(InterruptedException iE)
{
iE.printStackTrace();
System.out.println("Executor Exception: "+ iE);
}
}
Executor.awaitTermination
的javadoc说:
在关机请求后阻塞直到所有任务完成执行,
但是你从来没有发出一个关机请求。
在您的情况下,您可以利用ExecutorCompletionService
,如:
CompletionService<String> ecs = new ExecutorCompletionService<String>(executor);
List<String> countries= Arrays.asList("UK","USA","Germany","Spain","Italy");
for(String country : countries) {
ecs.submit(new Billing(country),country);
}
// wait for completion
for(int i=0;i<countries.size();i++){
ecs.take(); // wait for next country completion
}
// all work completed, shutdown
executor.shutdownNow();
我不确定你是否理解循环是如何工作的。不同之处在于,第二段代码将在每个列出的国家/地区运行5次计费。
假设您正在谈论代码的for
循环部分,它如何工作并不明显。
理想的循环应该是这样的:
for(String country : countryCollection) {
executor.execute(new Billing(country));
}
您考虑过使用enum
吗?
static class Billing implements Runnable {
enum Country {
UK,
USA,
Germany,
Spain,
Italy;
}
public Billing(Country country) {
}
@Override
public void run() {
}
}
public void test() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 5, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10),
new ThreadPoolExecutor.CallerRunsPolicy());
for ( Billing.Country country : Billing.Country.values() ) {
executor.execute(new Billing(country));
}
}
考虑实现这一点的另一种方法是看一下Fork/Join框架。这似乎真的可以从工作磨练中受益。例如,你可以把它拆分得相当干净。这将允许您分解用户或用户子集的计费任务,而不是让一个线程看起来代表一个国家的所有计费工作。
如果你使用的是Java <7