多线程网站解析器和内存不足问题



你好我正在编写一个网站解析器,它的目标是快速,因此是多线程的。我使用的外部库有:apacheHTTP客户端、Jsoup(用于HTML解析)和GPars(用于消息驱动线程)。现在我将展示一些我正在尝试实现的概念

static StaticDispatchActor<String> httpActor;
    public static void main(String[] args) throws ClientProtocolException, IOException {
        int numThreads = 25;
        try{
            numThreads = Integer.parseInt(args[0]);
        } catch (Exception e){
            System.out.println("Number of threads defaulted to "+numThreads);
        }
        final int numberOfThreads = numThreads;
        final ExecutorService threadpool = Executors.newCachedThreadPool();
        final Async async = Async.newInstance().use(threadpool);
        AtomicInteger jobCount = new AtomicInteger(0);
//.....
// This is a parser itself which parses usernames out of every page.
        Actor jsoupUser = new StaticDispatchActor<String>(){ // actor to parse users
            HashSet<String> users = new HashSet<>(); // found users
            public void onMessage(String html){ // takes html -> adds parsed users 2 set
                users.addAll(Jsoup.parse(html)
                    .select("a[href*=/profile/]").stream() // select links
                    .map(e -> e.text()) // extract usernames
                    .filter(s -> s.length() > 0) // empty lines -> out
                    .collect(Collectors.toSet()));
                System.out.print("Users: "+users.size()+", Jobs: "+jobCount.get()+"r");
            }
        }.start();
// This actor shall extract new links to parse out of every page
        Actor jsoupLinker = new StaticDispatchActor<String>(){ // link extractor
            HashSet<String> usedLinks = new HashSet<>(); // already found links
            public synchronized void removeBack(String url){
            @Override
            public void onMessage(String html) {
                Set<String> links =  Jsoup.parse(html).select("a[href]").stream().parallel()
                .map(e -> e.attr("href").replace("#comments", "")// here also some replacements...
                )
                .filter(s -> (!usedLinks.contains(s) && /* other filters */ )
                .collect(Collectors.toSet());
                links.forEach(url -> httpActor.send(url)); // send to process new URLs
            }
        }.start(); // start actor
// this actor is the processor of new links and where the error comes in:
httpActor = new StaticDispatchActor<String>(){ // process responses async
            public void onMessage(String url) {
                try{
                while(jobCount.get()>numberOfThreads); // wait for running threads to be less than wanted value; without this number of running jobs goes out of any control
                async.execute(Request.Get(defaultWebSiteUrl+url), new FutureCallback<Content>(){ @Override // do request and process async
                    public void completed(Content c) {
                        jobCount.decrementAndGet();
                        try{
                        String s = c.asString();
                        jsoupUser.send(s);
                        jsoupLinker.send(s);
                        } catch (OutOfMemoryError e1){
                            System.out.println("out of my memory, "); // This is the thrown error the question is about - [1]
                        }
                    }
            @Override public void failed(Exception e) {
                        jobCount.decrementAndGet();
                        try {
                            throw e;
                        } catch (ConnectException e4){ // if the request is timed out resend it
                            httpActor.send(url);
                            System.out.println("resentrn");
                        } catch (HttpResponseException e0){
                        } catch (Exception e1) { // for all other exceptions
                            e1.printStackTrace();
                        }
                    }
            @Override public void cancelled() {
                jobCount.decrementAndGet(); // never done actually
            }
        });
            jobCount.incrementAndGet();
        } catch (IllegalArgumentException e3){
            System.out.println("some illigal shit");
        }
    }
};
httpActor.start();

现在的问题是,尽管我限制了一些正在运行的作业,但我的代码不知何故内存不足(在代码中搜索[1]以查看位置)。也许你对如何解决它有任何想法。或者有一些类似任务的展示,因为我在整个应用程序设计上填错了,也许我应该改变它?非常感谢。

因此,使用BiziClop的提示,我能够找出错误。若有人感兴趣,正如您在上面看到的,我将从服务器接收的HTML代码作为字符串发送给两个不同的参与者,然后在这些参与者内部解析它们。这就是所有内存不足错误的原因,因为这些HTML页面非常大,特别是考虑到消息队列中有多少页面正在等待处理。我使用的解决方案只是解析文档并选择所需的元素,然后将它们的列表传输给匹配的参与者进行进一步处理。

Document doc = Jsoup.parse(c.asString());
jsoupUser.send(doc.select("a[href*=/profile/]"));
jsoupLinker.send(doc.select("a[href]"));

不过,如果有人对如何改进算法有什么要说的,我会非常感激。

最新更新