Perl 并行爬虫多线程



我有一个多线程的Perl爬虫,如果我在数组中声明URL,它可以正常工作。如果我从数据库读取 URL,我会收到"分段失败"错误。请帮我解决这个问题。谢谢

直接网址声明

use 5.012; use warnings;
use threads;
use Thread::Queue;
use LWP::UserAgent;
use constant THREADS => 10;
my $queue = Thread::Queue->new();
my @URLs =qw(http://www.example.com
http://www.example.com1
http://www.example.com2 );
print @URLs;
my @threads;
for (1..THREADS) {
    push @threads, threads->create(sub {
        my $ua = LWP::UserAgent->new;
        $ua->timeout(5); # short timeout for easy testing.
        while(my $task = $queue->dequeue) {
            my $response = eval{ $ua->get($task)->status_line };
            say "$task --> $response";
        }
    });
}
$queue->enqueue(@URLs);
$queue->enqueue(undef) for 1..THREADS;
# ... here work is done
$_->join foreach @threads;

尝试从数据库读取 URL

my $dbh = DBI->connect("DBI:mysql:$database;host=$server", $username, $password) # Get the rows from database
    || die "Could not connect to database: $DBI::errstr";
my $sth = $dbh->prepare('select cname,url,xpath,region from competitors')    #query to select required fields
    || die "$DBI::errstr";
$sth->execute();
if ($sth->rows < 0) {
    print "Sorry, no domains found.n";
}
else {                                                
    while (my $results = $sth->fetchrow_hashref) {
        my $competitor= $results->{cname};                      
        my $url = $results->{url};                         
        my $xpath = $results->{xpath};
        my $region = $results->{region};
        push(my @all,$url);   
        use constant THREADS => 10;
        my $queue = Thread::Queue->new();
        my @URLs=@all;
        my @threads;
        for (1..THREADS) {
            push @threads, threads->create(sub {
                my $ua = LWP::UserAgent->new;
                $ua->timeout(500); # short timeout for easy testing.
                while(my $task = $queue->dequeue) {
                    my $response = eval{ $ua->get($task)->status_line };
                    print  "$task --> $response";
                }
            });
        }
        $queue->enqueue( @URLs);
        $queue->enqueue(undef) for 1..THREADS;
        # ... here work is done
        $_->join foreach @threads;
    }
}  #close db
$sth->finish;
$dbh->disconnect;

预期盈亏

www.example.com-->200 ok
www.example.com1-->200 ok

当前负债

分段错误

当你创建线程时,你的$sth$dbh仍然存在,创建它们的副本,这是一个禁忌。

新创建的线程必须与数据库建立自己的连接。句柄不能跨线程共享。

更好地确定变量范围应该可以避免此问题。

use strict;
use warnings;
use threads;
use Thread::Queue 3.01 qw( );
use constant NUM_WORKERS => 10;
sub worker {
   my ($ua, $url) = @_;
   ...
}
{
   my $q = Thread::Queue->new();
   for (1..NUM_WORKERS) {
      async {
         my $ua = LWP::UserAgent->new();
         while ( my $url = $q->dequeue() ) {
            eval { worker($ua, $url); 1 }
               or warn $@;
         }
      };
   }
   {
      my $dbh = DBI->connect(..., { RaiseError => 1 });
      my $sth = $dbh->prepare('SELECT ...');
      $sth->execute();
      while ( my $row = $sth->fetchrow_hashref() ) {
         $q->enqueue($row->{url});
      }
   }
   $q->end();
   $_->join for threads->list;
}
你应该在

while 循环之外声明@all,然后,当 URL 被推送时,关闭该循环并继续

my $dbh = DBI->connect("DBI:mysql:$database;host=$server", $username, $password) # Get the rows from database
    || die "Could not connect to database: $DBI::errstr";
my $sth = $dbh->prepare('select cname,url,xpath,region from competitors')    #query to select required fields
    || die "$DBI::errstr";
$sth->execute();
# >> declare your URL-array before starting to fetch
my @URLs;
if ($sth->rows < 0) {
    print "Sorry, no domains found.n";
}
else {

    while (my $results = $sth->fetchrow_hashref) {
        my $competitor= $results->{cname};                      
        my $url = $results->{url};                         
        my $xpath = $results->{xpath};
        my $region = $results->{region};
        push(@URLs,$url);   
    }
}  
$sth->finish;
$dbh->disconnect;
use constant THREADS => 10;
my $queue = Thread::Queue->new();
my @threads;
for (1..THREADS) {
        push @threads, threads->create(sub {
        my $ua = LWP::UserAgent->new;
        $ua->timeout(500); # short timeout for easy testing.
        while(my $task = $queue->dequeue) {
            my $response = eval{ $ua->get($task)->status_line };
            print  "$task --> $response";
        }
    });
}
$queue->enqueue( @URLs);
$queue->enqueue(undef) for 1..THREADS;
# ... here work is done
$_->join foreach @threads;
由于

perl代码,Segfaults非常罕见。它们与内存相关,通常意味着外部二进制文件中存在问题。(我会在这里押注DBI)

特别是线程有很多遗留问题 - 不过它们在较新版本的perl中变得越来越好。我强烈建议你考虑升级到最新版本的perl,如果你还没有的话。我知道这并不总是一个选择,但这是一个好主意。

真的

很难猜测你的问题,因为我没有你的数据库,所以我无法重新创建它。

我建议通常可以做一些事情来保持线程"干净" - 您的代码工作方式是数据库句柄在线程的范围内。我会避免这样做。在顶部声明线程子,范围尽可能窄。

不过我会注意到:

push ( my @all, $url ); 

可能没有按照你的想法去做!

但是,是的,拿你的代码,我会这样说:

#!/usr/bin/perl
use strict;
use warnings;
use threads;
use Thread::Queue;
use LWP;
my $num_threads = 10;
my $work_q = Thread::Queue->new();
sub worker {
    my $ua = LWP::UserAgent->new;
    $ua->timeout(500);    # short timeout for easy testing.
    while ( my $task = $work_q->dequeue ) {
        my $response = eval { $ua->get($task)->status_line };
        print "$task --> $response";
    }
}

## fetch_list
sub fetch_url_list {
    my $dbh = DBI->connect( "DBI:mysql:$database;host=$server",
        $username, $password )    # Get the rows from database
        || die "Could not connect to database: $DBI::errstr";
    my $sth =
        $dbh->prepare( 'select cname,url,xpath,region from competitors'
        )                         #query to select required fields
        || die "$DBI::errstr";
    $sth->execute();

    if ( $sth->rows < 0 ) {
        print "Sorry, no domains found.n";
    }
    else {
        while ( my $results = $sth->fetchrow_hashref ) {
            my $competitor = $results->{cname};
            my $url        = $results->{url};
            my $xpath      = $results->{xpath};
            my $region     = $results->{region};
            $work_q -> enqueue ( $url );
        }
    }
    $sth->finish;
    $dbh->disconnect;
}
for ( 1 .. $num_threads ) {
    threads->create( &worker );
}
fetch_url_list();
$work_q->end;
foreach my $thr ( threads->list() ) {
    $thr->join();
}

这样 - 您的线程都没有"范围内"的数据库内容,并且数据库在范围内没有线程内容。这减少了"污染"给你带来问题的几率。特别是 - 线程在开始"复制"当前范围内的所有内容时,当它们是对象时,它们可以做非常奇怪的事情。(例如,数据库句柄)

如果做不到这一点,我会考虑考虑一种"分叉"方法。线程擅长来回传递数据,但是当你不需要来回传递数据时(而且你不需要,你只是在运行测试并打印结果),分叉通常更有效(绝对是在基于 Unix 的系统上)。

最新更新