当爬虫使用scrapy和Rabbitmq (pika)出错时,如何请求消息



我尝试使用pika和scrapy来运行MQ,并让消费者调用爬虫。我有一只consumer.py和一只scraply spiderspider.py

爬行器在消费者中运行,带有从生产者发送的参数。我使用used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)删除留言。

我期望在爬行器完成任务时删除消息,如果出现错误,则应该对消息进行重新排队。当蜘蛛正常运行时,一切看起来都很好;消息被删除,工作完成。但是,如果在运行爬行器时发生错误,消息仍然被删除,作业没有完成,但是消息丢失了。

我观察了Rabbitmq管理UI,我发现当spider还在运行时消息变成了0(控制台还没有显示任务已经完成)。

我想知道是不是因为scrapy是异步的?因此,当这行run_spider(message=decodebody)仍在运行时,下一行used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)将不会等到爬行器完成。

我该如何解决这个问题?我想在蜘蛛正确完成工作后删除消息。

from scrapy.utils.project import get_project_settings
setup() # for CrawlerRunner
settings = get_project_settings()
def get_message(used_channel, basic_deliver, properties, body):
decodebody = bytes.decode(body)
try:
run_spider(message=decodebody)
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
except: 
channel.basic_reject(delivery_tag=basic_deliver.delivery_tag)

def run_spider(message):
crawler = CrawlerRunner(settings)
crawler.crawl(MySpider, message=message)

while(True):
try: 
# blocking connection
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbit_host))
channel = connection.channel()
# declare exchange, the setting must be same as producer
channel.exchange_declare(
exchange=rabbit_exchange,
exchange_type='direct',  
durable=True,            
auto_delete=False        
)
# declare queue, the setting must be same as producer
channel.queue_declare(
queue=rabbit_queue, 
durable=True, 
exclusive=False,
auto_delete=False
)
# bind the setting
channel.queue_bind(
exchange=rabbit_exchange,
queue=rabbit_queue,
routing_key=routing_key
)
channel.basic_qos(prefetch_count=1) 
channel.basic_consume(
queue=rabbit_queue,
on_message_callback=get_message,
auto_ack=False
)
logger.info(' [*] Waiting for messages. To exit press CTRL+C')
# start crawler
channel.start_consuming()

except pika.exceptions.ConnectionClosed as err:
print('ConnectionClosed error:', err)
continue
# Do not recover on channel errors
except pika.exceptions.AMQPChannelError as err:
print("Caught a channel error: {}, stopping...".format(err))
break
# Recover on all other connection errors
except pika.exceptions.AMQPConnectionError as err:    
print("Connection was closed, retrying...", err)
continue

我发现有人用MQ为pika库处理多线程。他使用.is_alive来检查线程是否完成。所以,我遵循这个想法。Scrapy是多线程的,我添加了返回crawler,并在删除消息之前检查crawler._active

scrapy.crawler的源代码

def run_spider(news_info):
# run spider with CrawlerRunner
crawler = CrawlerRunner(settings)
# run the spider script
crawler.crawl(UrlSpider, news_info=news_info)
return crawler
crawler = run_spider(news_info=decodebody)

# wait until the crawler is done
while (len(crawler._active) > 0):
time.sleep(1)
used_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)

相关内容

  • 没有找到相关文章