使用 Java 的 ReadWriteLocks 实现多线程 Web 爬虫



我正在尝试使用ReadWriteLocks实现一个多线程web爬网程序。我有一个Callable调用API来获取页面URL,并在Seen URL集中不存在时对其进行爬网。

在ExecutorService中,我使用三个线程进行爬网。

问题是不同的线程正在读取同一个URL两次。如何防止不同的线程读取访问过的URL?

package Threads;
import java.util.*;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class WebCrawler {
static HashSet<String> seenURL = new HashSet<>();
List<String> resultVisitedUrls = new ArrayList<>();
ReadWriteLock lock_http_request = new ReentrantReadWriteLock();
Lock readLock_http_request = lock_http_request.readLock();
Lock writeLock_http_request = lock_http_request.writeLock();

public  boolean contains(String url){
readLock_http_request.lock();
try {
if(!seenURL.contains(url)){
return false;
}else{
return true;
}
}finally {
readLock_http_request.unlock();
}
}
public void addUrlToSeenURLSet(String url){
writeLock_http_request.lock();
try{
seenURL.add(url);
}finally {
writeLock_http_request.unlock();
}
}
public List<String> getResultVisitedUrls() {
return resultVisitedUrls;
}

public void crawl(String startUrl, HtmlParser htmlParser, WebCrawler crawler) throws Exception {

if (!crawler.contains(startUrl)) {
try {
crawler.addUrlToSeenURLSet(startUrl);
List<String> subUrls = htmlParser.getUrls(startUrl);
resultVisitedUrls.add(startUrl + "  Done by thread - " + Thread.currentThread());
for (String subUrl : subUrls) {
crawl(subUrl, htmlParser, crawler); 
}
} catch (Exception ex) {
throw new Exception("Something went wrong. Method - crawl : " + ex.getMessage());
}
}
}
public static void main(String[] args) {
class Crawl implements Callable<List<String>> {
String startUrl;
WebCrawler webCrawler;
public Crawl(String startUrl, WebCrawler webCrawler){
this.startUrl = startUrl;
this.webCrawler = webCrawler;
}
public List<String> call() {
HtmlParser htmlParser = new RetrieveURLs();
List<String> result = new ArrayList<>();
try {
webCrawler.crawl(startUrl, htmlParser, webCrawler);
result =  webCrawler.getResultVisitedUrls();
}catch(Exception ex){
System.err.println("Some exception occurred in run() - " + ex.getMessage());
}
return result;
}
}
ExecutorService service = Executors.newFixedThreadPool(4);
try{
WebCrawler webCrawler = new WebCrawler();
WebCrawler webCrawler1 = new WebCrawler();
Future<List<String>> future_1 = service.submit(new Crawl("http://localhost:3001/getUrls/google.com", webCrawler));
Future<List<String>> future_2 = service.submit(new Crawl("http://localhost:3001/getUrls/google.com", webCrawler1));
Future<List<String>> future_3 = service.submit(new Crawl("http://localhost:3001/getUrls/google.com", webCrawler1));
List<String> result_1 = future_1.get();
List<String> result_2 = future_2.get();
List<String> result_3 = future_3.get();
result_1.addAll(result_2);
result_2.addAll(result_3);
//Assert.assertEquals(6, result_1.size());
System.out.println(result_1.size());
for(String str : result_1){
System.out.println(str );
}
}catch(ExecutionException | InterruptedException ex){
}finally {
service.shutdown();
}
}
}

您的错误在于两个线程可以用相同的值调用contains(url),但两者都为false,所以它们都会用crawler.addUrlToSeenURLSet(startUrl)进入代码块。seenURL.add(url)返回一个您已忽略的状态代码,这将告诉哪个线程成功。

与其使用一对锁,不如使用一个由ConcurrentHashMap和线程安全支持的并发集。

private static final Set<String> seenURLs = ConcurrentHashMap.newKeySet(); 

当您使用这个集合时,您只需要调用add,因为它在第一次调用时返回true,如果集合已经包含另一个线程正在处理的相同startUrl值,则返回false

if(seenURLs.add(startUrl)) {
... crawl 
}

或者,如果您希望使用锁,您应该将addUrlToSeenURLSet更改为return seenURL.add(url);以确认哪个线程成功,然后在运行爬网之前只需要此测试:

if(addUrlToSeenURLSet(startUrl)) {
... crawl 
}

最新更新