Java fork join pool吃掉所有线程资源



我有一个字符串解析器(解析大型文本blob),需要在java分叉连接池中运行。该池比其他线程更快,并且在使用正则表达式和 xpath 时将我的解析时间减少了 30 多分钟。但是,正在创建的线程数量急剧攀升,我需要能够终止它们,因为线程池被多次调用。 如何在不将池限制为 4 核系统上的 1 个内核的情况下减少线程的增加?

我的线程数超过 40000,我需要它接近 5000,因为该程序正在运行 10 次,我的用户的冷执行限制为 50000 个线程。

此问题在Windows和Linux上都发生。

我是:

  • 将最大处理器数设置为可用处理器数*可配置数,当前为 1
  • 调用 get() 后取消任务
  • 在重新实例化之前拼命将叉接池设置为 null,因为我绝望了

任何帮助将不胜感激。谢谢。

这是我用来停止、获取和重新启动池的代码。我可能还应该注意,我使用 fjp.submit(TASK) 提交每个任务,然后在关闭时调用它们。

while(pages.size()>0) { log.info("当前活动线程:"+Thread.activeCount()); log.info("在迭代中找到的页面 "+j+": "+pages.size());

        if(fjp.isShutdown())
        {
            fjp=new ForkJoinPool(Runtime.getRuntime().availableProcessors()*procnum);
        }
        i=0;
        //if asked to generate a hash, due this first
        if(getHash==true){
            log.info("Generating Hash");
            int s=pages.size();
            while(i<s){
                String withhash=null;
                String str=pages.get(0);
                if(str != null){
                    jmap=Json.read(str).asJsonMap();
                    jmap.put("offenderhash",Json.read(genHash(jmap.get("offenderhash").asString()+i)));
                    for(String k:jmap.keySet()){
                        withhash=(withhash==null)?"{""+k+"":""+jmap.get(k).asString()+""":withhash+",""+k+"":""+jmap.get(k).asString()+""";
                    }
                    if(withhash != null){
                        withhash+=",}";
                    }
                    pages.remove(0);
                    pages.add((pages.size()-1), withhash);
                    i++;
                }
            }
            i=0;
        }
        if(singlepats != null)
        {
        log.info("Found Singlepats");
        for(String row:pages)
        {   
            String str=row;
            str=str.replaceAll("t|r|rn|n","");
            jmap=Json.read(str).asJsonMap();
            if(singlepats.containsKey("table"))
            {
                if(fjp.isShutdown())
                {
                    fjp=new ForkJoinPool((Runtime.getRuntime().availableProcessors()*procnum));
                }
                fjp=new ForkJoinPool((Runtime.getRuntime().availableProcessors()*procnum));
                if(jmap.get(column)!=null)
                {
                    if(test){
                        System.out.println("//////////////////////HTML////////////////////////n"+jmap.get(column).asString()+"n///////////////////////////////END///////////////////////////nn");
                    }
                    if(mustcontain != null)
                    {
                        if(jmap.get(column).asString().contains(mustcontain))
                        {
                            if(cannotcontain != null)
                            {
                                if(jmap.get(column).asString().contains(cannotcontain)==false)
                                results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\s\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
                            }
                            else
                            {
                                results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\s\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
                            }
                        }
                    }
                    else if(cannotcontain != null)
                    {
                        if(jmap.get(column).asString().contains(cannotcontain)==false)
                        {
                            results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\s\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
                        }
                    }
                    else
                    {
                        results.add(fjp.submit(new ParsePage(replacementPattern,singlepats.get("table"),jmap.get(column).asString().replaceAll("\s\s", " "),singlepats, Calendar.getInstance().getTime().toString(), jmap.get("offenderhash").asString())));
                    }
                }
            }
            i++;
            if(((i%commit_size)==0 & i != 0) | i==pages.size() |pages.size()==1 & singlepats != null)
            {
                log.info("Getting Regex Results");
                log.info("Shutdown");
                try {
                    fjp.awaitTermination(termtime, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e1) {
                    // TODO Auto-generated catch block
                    e1.printStackTrace();
                }
                fjp.shutdown();
                while(fjp.isTerminated()==false)
                {
                    try{
                        Thread.sleep(5);
                    }catch(InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                }

                for(Future<String> r:results)
                {
                    try {
                        add=r.get();
                        if(add.contains("No Data")==false)
                        {
                            parsedrows.add(add);
                        }
                        add=null;
                        if(r.isDone()==false)
                        {
                            r.cancel(true);
                        }
                        if(fjp.getActiveThreadCount()>0 && fjp.getRunningThreadCount()>0)
                        {
                            fjp.shutdownNow();
                        }
                        fjp=new ForkJoinPool(Runtime.getRuntime().availableProcessors()*procnum);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                results=new ArrayList<ForkJoinTask<String>>();
                if(parsedrows.size()>=commit_size)
                {
                    if(parsedrows.size()>=SPLITSIZE)
                    {
                        sendToDb(parsedrows,true);
                    }
                    else
                    {
                        sendToDb(parsedrows,false);
                    }
                    parsedrows=new ArrayList<String>();
                }

                //hint to the gc in case it actually pays off (think if i were a gambling man)
                System.gc();
                Runtime.getRuntime().gc();
            }

        }
        }
        log.info("REMAINING ROWS TO COMMIT "+parsedrows.size());
        log.info("Rows Left"+parsedrows.size());
        if(parsedrows.size()>0)
        {

            if(parsedrows.size()>=SPLITSIZE)
            {
                sendToDb(parsedrows,true);
            }
            else
            {
                sendToDb(parsedrows,false);
            }

            parsedrows=new ArrayList<String>();
        }
        records+=i;
        i=0;
//Query for more records to parse

看起来你正在为每个结果创建一个新的ForkJoinPool。你真正想做的是创建一个你的所有任务都将共享的ForkJoinPool。额外的池不会提供额外的并行性,所以一个应该没问题。当您获得准备运行的任务时,如果您已经在任务中,请带上您的 fjp 并调用fjp.execute(ForkJoinTask)ForkJoinTask.fork()

制作多个池似乎是一场簿记噩梦。尝试只使用一个共享的

你可能在 Java7 中使用 join()。加入不起作用。它需要上下文切换,而Java程序无法进行上下文切换,因此框架创建"延续线程"以保持移动。几年前,我在这篇文章中详细介绍了这个问题:ForkJoin Clamamity

相关内容

  • 没有找到相关文章

最新更新