多线程foreach hashmap循环



我存储在JSoup的帮助下检索到的数据,并将它们提交到我自己的HTTP API。

问题:我如何迭代我的HashMap与多线程,没有每个线程处理我的HashMap的相同值,就像它是目前的情况。

其实

:

Thread1: a  
Thread2: a  
Thread3: a  
Thread4: a  
Thread1: b  
Thread2: b  
Thread3: b  
Thread4: b  

我想要这样的东西:

Thread1 : a 
Thread2 : b
Thread3 : c
Thread4 : d
package ygg.org;
import java.io.IOException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.select.Elements;
public class Filmstreaming1 {
    final static int NB_PAGE = 2;
    final static int THREADS = 1;
    static ConcurrentHashMap<String, String> movies_list = new ConcurrentHashMap<>();
    static int count = 0;
    static void Initialize() {
        System.out.println("----------------------------------");
        System.out.println("Homer is starting...");
        System.out.println("------------------------------");
        for (int i = 1 ; i <= NB_PAGE ; i++) {
            try {
                Document page = Jsoup.connect("http://xxxxxxx.com/page/" + i + "/")
                                    .userAgent("Mozilla")
                                    .timeout(3000)
                                    .post();
                Elements movies = page.getElementsByClass("margin-b40").get(0).getElementsByClass("short-link").select("a");
                for (Element movie : movies) {
                    String href = movie.attr("href");
                    String movie_title = movie.text().replaceAll("\(.*\)", "");
                    boolean isMovieExists = movies_list.containsKey(href);
                    if (isMovieExists == false) {
                        movies_list.put(href, movie_title);
                        System.out.println("Ajout du film " + movie_title);
                    }                          
                }
                System.out.println("Total récupérés " + movies_list.size() + " page : " + i);
            } catch(IOException ioe) {
                System.out.println("Exception: " + ioe);
            }
        }
        try {
            for (int i = 0; i <= THREADS; i++) {
                Thread api = new ThreadApi();
                api.start();
            }
        } catch(Exception e) {
            System.out.println("Exception: " + e.getMessage());
        }
    }
}
class ThreadApi extends Thread {
    public void  run() {
        while(true) {
            Filmstreaming1.movies_list.forEach((key, value) -> {
                try {
                    String code = key.substring(key.indexOf("com/") + 4, key.indexOf("-"));
                    Document page = Jsoup.connect("http://xxxxxxx.com/" + code + "--.html")
                                                .userAgent("Mozilla")
                                                .timeout(3000)
                                                .post();
                    String director = page.getElementsByClass("finfo-text").get(5).text().toString();
                    Document page1 = Jsoup.connect("http://xxxxxxx.com/play.php?newsid=" + code + "&vt=ol&sr=3")
                                                .referrer("http://xxxxxxx.com/" + code + "--.html")
                                                .userAgent("Mozilla")
                                                .timeout(3000)
                                                .post();

                    String link = page1.getElementsByTag("iframe").first().attr("src").toString();
                    String encoded_title = URLEncoder.encode((String) value, "UTF-8");
                    String encoded_director = URLEncoder.encode((String) director, "UTF-8");
                    String url = "http://xxxxxxx.com/api/movie?movie=" + encoded_title + "&director=" + encoded_director;
                    // On affiche l'url 
                    System.out.println(url);
                    Document api = Jsoup.connect(url)
                                                .userAgent("Mozilla") 
                                                .timeout(3000)
                                                .get();
                    String response = api.text();
                    System.out.println(response);
                    if (response == "-1") {
                        System.out.println("Erreur");       
                    } else {
                        url = "http://xxxxxxx.com/api/video?link=" + link + "&ref=" + response + "&version=vf";             
                        Document submit = Jsoup.connect(url)
                                                    .userAgent("Mozilla") 
                                                    .timeout(3000)
                                                    .get();
                        response = submit.text();
                        Filmstreaming1.movies_list.remove(key);
                        System.out.println(response);
                    }
                } catch(Exception e) {
                        System.out.println("Exception " + e.getMessage());
                }
            });
        }
    }
}

由于您的Map已经是ConcurrentHashMap,您可以使用ConcurrentHashMap。forEach -这允许配置一个paralleslismThreshold,它可以在超过阈值时自动并行执行调用。

文档对阈值参数的影响有如下说明:

这些批量操作接受parallelmthreshold参数。如果估计当前映射大小小于给定阈值,则方法依次进行。使用Long值。MAX_VALUE抑制所有并行性。使用1的值可以通过划分足够多的子任务来充分利用用于所有并行计算的ForkJoinPool.commonPool(),从而获得最大的并行性。通常,您将首先选择这些极值中的一个,然后使用中间值来衡量性能,以权衡开销与吞吐量。

因此不需要创建自己的Thread甚至Runnable实现,任何方法引用或lambda作为BiConsumer<? super K,? super V>都可以。

最新更新