在芹菜任务中运行一个碎屑蜘蛛



这不再有效,scrapy的API已经更改。

现在文档提供了一种"从脚本运行Scrapy"的方法,但我得到了ReactorNotRestartable错误。

我的任务:

from celery import Task
from twisted.internet import reactor
from scrapy.crawler import Crawler
from scrapy import log, signals
from scrapy.utils.project import get_project_settings
from .spiders import MySpider

class MyTask(Task):
def run(self, *args, **kwargs):
spider = MySpider
settings = get_project_settings()
crawler = Crawler(settings)
crawler.signals.connect(reactor.stop, signal=signals.spider_closed)
crawler.configure()
crawler.crawl(spider)
crawler.start()
log.start()
reactor.run()

扭曲电抗器无法重新启动。对此的一个解决方案是,让芹菜任务为您想要执行的每个爬网生成一个新的子进程,如下文所述:

  • 在芹菜任务中运行碎屑蜘蛛

这绕过了";反应堆不能重新启动";通过使用CCD_ 2包发布。但问题是,最新的芹菜版本现在已经过时了,因为您将遇到另一个问题,即守护进程无法生成子进程。因此,为了使变通方法发挥作用,您需要使用芹菜版本。

是,并且scrapyAPI已更改。但有一些小的修改(import Crawler而不是CrawlerProcess)。您可以通过使用芹菜版本来获得解决方法。

芹菜问题可以在这里找到:芹菜问题#1709

这是我的更新的爬网脚本,它通过使用billiard而不是multiprocessing:与较新的芹菜版本一起工作

from scrapy.crawler import Crawler
from scrapy.conf import settings
from myspider import MySpider
from scrapy import log, project
from twisted.internet import reactor
from billiard import Process
from scrapy.utils.project import get_project_settings
from scrapy import signals

class UrlCrawlerScript(Process):
def __init__(self, spider):
Process.__init__(self)
settings = get_project_settings()
self.crawler = Crawler(settings)
self.crawler.configure()
self.crawler.signals.connect(reactor.stop, signal=signals.spider_closed)
self.spider = spider
def run(self):
self.crawler.crawl(self.spider)
self.crawler.start()
reactor.run()
def run_spider(url):
spider = MySpider(url)
crawler = UrlCrawlerScript(spider)
crawler.start()
crawler.join()

编辑:通过阅读芹菜问题#1709,他们建议使用台球而不是多处理,以取消子流程限制。换句话说,我们应该试试台球,看看它是否有效!

编辑2:是的,通过使用台球,我的脚本可以使用最新的芹菜构建!请参阅我更新的脚本。

Twisted reactor无法重新启动,因此一旦一个spider完成运行,crawler隐式停止reactor,该工作程序就没有用了。

正如另一个问题的答案所示,你所需要做的就是杀死运行你的蜘蛛的工人,并用一个新的蜘蛛代替它,这样可以防止反应堆多次启动和停止。要做到这一点,只需设置:

CELERYD_MAX_TASKS_PER_CHILD = 1

不利的一面是,你并没有真正充分利用Twisted reactor的潜力,并浪费资源运行多个反应器,因为一个反应器可以在一个进程中同时运行多个蜘蛛。一个更好的方法是每个工人运行一个反应堆(甚至是全球一个反应堆),不要让crawler接触它

我正在为一个非常相似的项目做这件事,所以如果我有任何进展,我会更新这篇文章。

为了避免在Celery任务队列中运行Scrapy时出现ReactorNotRestartable错误,我使用了线程。同样的方法用于在一个应用程序中多次运行Twisted reactor。Scrapy也使用了Twisted,所以我们可以用同样的方法。

这是代码:

from threading import Thread
from scrapy.crawler import CrawlerProcess
import scrapy
class MySpider(scrapy.Spider):
name = 'my_spider'

class MyCrawler:
spider_settings = {}
def run_crawler(self):
process = CrawlerProcess(self.spider_settings)
process.crawl(MySpider)
Thread(target=process.start).start()

不要忘记增加芹菜的CELERYD_CONCURRENCY。

CELERYD_CONCURRENCY = 10

对我来说很好。

这并不是阻止进程运行,但无论如何,最好的做法是在回调中处理数据。只需这样做:

for crawler in process.crawlers:
crawler.spider.save_result_callback = some_callback
crawler.spider.save_result_callback_params = some_callback_params
Thread(target=process.start).start()

以下是对我有效的方法,灵感来自这个答案:

from scrapy.settings import Settings
from scraper.scraper import settings as scraper_settings
from celery import signals, Celery
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "main.settings")
app = Celery("enrichment")
@signals.worker_process_init.connect
def configure_infrastructure(**kwargs):
from twisted.internet import asyncioreactor
asyncioreactor.install() # TWISTED_REACTOR setting in scraper/scraper/settings.py
from crochet import setup
setup()

@app.task()
def do_scraping():
crawler_settings = Settings()
crawler_settings.setmodule(scraper_settings)
runner = CrawlerRunner(settings=crawler_settings)
runner.crawl("spider_name", url="some_url")

如果有很多任务要处理,那么这种方法的效率非常低。因为Celery是线程化的——在自己的线程中运行每个任务。假设使用RabbitMQ作为代理,您可以通过>10K q/s。对于Celery,这可能会导致10K的线程开销!我建议这里不要用芹菜。而是直接访问代理!

相关内容

  • 没有找到相关文章

最新更新