我有一个多线程的Perl爬虫,如果我在数组中声明URL,它可以正常工作。如果我从数据库读取 URL,我会收到"分段失败"错误。请帮我解决这个问题。谢谢
直接网址声明
use 5.012; use warnings;
use threads;
use Thread::Queue;
use LWP::UserAgent;
use constant THREADS => 10;
my $queue = Thread::Queue->new();
my @URLs =qw(http://www.example.com
http://www.example.com1
http://www.example.com2 );
print @URLs;
my @threads;
for (1..THREADS) {
push @threads, threads->create(sub {
my $ua = LWP::UserAgent->new;
$ua->timeout(5); # short timeout for easy testing.
while(my $task = $queue->dequeue) {
my $response = eval{ $ua->get($task)->status_line };
say "$task --> $response";
}
});
}
$queue->enqueue(@URLs);
$queue->enqueue(undef) for 1..THREADS;
# ... here work is done
$_->join foreach @threads;
尝试从数据库读取 URL
my $dbh = DBI->connect("DBI:mysql:$database;host=$server", $username, $password) # Get the rows from database
|| die "Could not connect to database: $DBI::errstr";
my $sth = $dbh->prepare('select cname,url,xpath,region from competitors') #query to select required fields
|| die "$DBI::errstr";
$sth->execute();
if ($sth->rows < 0) {
print "Sorry, no domains found.n";
}
else {
while (my $results = $sth->fetchrow_hashref) {
my $competitor= $results->{cname};
my $url = $results->{url};
my $xpath = $results->{xpath};
my $region = $results->{region};
push(my @all,$url);
use constant THREADS => 10;
my $queue = Thread::Queue->new();
my @URLs=@all;
my @threads;
for (1..THREADS) {
push @threads, threads->create(sub {
my $ua = LWP::UserAgent->new;
$ua->timeout(500); # short timeout for easy testing.
while(my $task = $queue->dequeue) {
my $response = eval{ $ua->get($task)->status_line };
print "$task --> $response";
}
});
}
$queue->enqueue( @URLs);
$queue->enqueue(undef) for 1..THREADS;
# ... here work is done
$_->join foreach @threads;
}
} #close db
$sth->finish;
$dbh->disconnect;
预期盈亏
www.example.com-->200 ok
www.example.com1-->200 ok
当前负债
分段错误
当你创建线程时,你的$sth
和$dbh
仍然存在,创建它们的副本,这是一个禁忌。
新创建的线程必须与数据库建立自己的连接。句柄不能跨线程共享。
更好地确定变量范围应该可以避免此问题。
use strict;
use warnings;
use threads;
use Thread::Queue 3.01 qw( );
use constant NUM_WORKERS => 10;
sub worker {
my ($ua, $url) = @_;
...
}
{
my $q = Thread::Queue->new();
for (1..NUM_WORKERS) {
async {
my $ua = LWP::UserAgent->new();
while ( my $url = $q->dequeue() ) {
eval { worker($ua, $url); 1 }
or warn $@;
}
};
}
{
my $dbh = DBI->connect(..., { RaiseError => 1 });
my $sth = $dbh->prepare('SELECT ...');
$sth->execute();
while ( my $row = $sth->fetchrow_hashref() ) {
$q->enqueue($row->{url});
}
}
$q->end();
$_->join for threads->list;
}
while 循环之外声明@all,然后,当 URL 被推送时,关闭该循环并继续
my $dbh = DBI->connect("DBI:mysql:$database;host=$server", $username, $password) # Get the rows from database
|| die "Could not connect to database: $DBI::errstr";
my $sth = $dbh->prepare('select cname,url,xpath,region from competitors') #query to select required fields
|| die "$DBI::errstr";
$sth->execute();
# >> declare your URL-array before starting to fetch
my @URLs;
if ($sth->rows < 0) {
print "Sorry, no domains found.n";
}
else {
while (my $results = $sth->fetchrow_hashref) {
my $competitor= $results->{cname};
my $url = $results->{url};
my $xpath = $results->{xpath};
my $region = $results->{region};
push(@URLs,$url);
}
}
$sth->finish;
$dbh->disconnect;
use constant THREADS => 10;
my $queue = Thread::Queue->new();
my @threads;
for (1..THREADS) {
push @threads, threads->create(sub {
my $ua = LWP::UserAgent->new;
$ua->timeout(500); # short timeout for easy testing.
while(my $task = $queue->dequeue) {
my $response = eval{ $ua->get($task)->status_line };
print "$task --> $response";
}
});
}
$queue->enqueue( @URLs);
$queue->enqueue(undef) for 1..THREADS;
# ... here work is done
$_->join foreach @threads;
perl代码,Segfaults非常罕见。它们与内存相关,通常意味着外部二进制文件中存在问题。(我会在这里押注DBI)
特别是线程有很多遗留问题 - 不过它们在较新版本的perl中变得越来越好。我强烈建议你考虑升级到最新版本的perl,如果你还没有的话。我知道这并不总是一个选择,但这是一个好主意。
真的很难猜测你的问题,因为我没有你的数据库,所以我无法重新创建它。
我建议通常可以做一些事情来保持线程"干净" - 您的代码工作方式是数据库句柄在线程的范围内。我会避免这样做。在顶部声明线程子,范围尽可能窄。
不过我会注意到:
push ( my @all, $url );
可能没有按照你的想法去做!
但是,是的,拿你的代码,我会这样说:
#!/usr/bin/perl
use strict;
use warnings;
use threads;
use Thread::Queue;
use LWP;
my $num_threads = 10;
my $work_q = Thread::Queue->new();
sub worker {
my $ua = LWP::UserAgent->new;
$ua->timeout(500); # short timeout for easy testing.
while ( my $task = $work_q->dequeue ) {
my $response = eval { $ua->get($task)->status_line };
print "$task --> $response";
}
}
## fetch_list
sub fetch_url_list {
my $dbh = DBI->connect( "DBI:mysql:$database;host=$server",
$username, $password ) # Get the rows from database
|| die "Could not connect to database: $DBI::errstr";
my $sth =
$dbh->prepare( 'select cname,url,xpath,region from competitors'
) #query to select required fields
|| die "$DBI::errstr";
$sth->execute();
if ( $sth->rows < 0 ) {
print "Sorry, no domains found.n";
}
else {
while ( my $results = $sth->fetchrow_hashref ) {
my $competitor = $results->{cname};
my $url = $results->{url};
my $xpath = $results->{xpath};
my $region = $results->{region};
$work_q -> enqueue ( $url );
}
}
$sth->finish;
$dbh->disconnect;
}
for ( 1 .. $num_threads ) {
threads->create( &worker );
}
fetch_url_list();
$work_q->end;
foreach my $thr ( threads->list() ) {
$thr->join();
}
这样 - 您的线程都没有"范围内"的数据库内容,并且数据库在范围内没有线程内容。这减少了"污染"给你带来问题的几率。特别是 - 线程在开始"复制"当前范围内的所有内容时,当它们是对象时,它们可以做非常奇怪的事情。(例如,数据库句柄)
如果做不到这一点,我会考虑考虑一种"分叉"方法。线程擅长来回传递数据,但是当你不需要来回传递数据时(而且你不需要,你只是在运行测试并打印结果),分叉通常更有效(绝对是在基于 Unix 的系统上)。