这是我想要实现的逻辑:
1. Get a list of URLs by scraping a home page
2. Get, scrape and store a sublist of URLs in parallel by visiting each link in above list
我首先尝试创建一个主任务,该任务首先从主页上抓取所有URL,然后在for循环中获取子URL:
@task
def master_task():
urls = scrape_list_of_urls()
job = group([scrape_url_and_save.s(url) for url in urls]) # scrape_url takes around 200ms each URL, and there are thousands of URLs. Hence I want it to run in parallel
result = job.apply_async()
result.join()
@task
def scrape_url_and_save(url):
save_to_db(contents_of_url_by_scraping)
...
def scrape(request): # In Django
master_task.delay()
...
但这会导致错误:
运行时错误:永远不要在任务中调用 result.get((!
我在 Django 应用程序中使用 Celery 4。master_task
必须是一项任务,因为我不希望用户在抓取主页时等待。我不确定我的代码逻辑是否正确。更好的逻辑将不胜感激。
这是实现所需工作流程的方法 - 您的主任务应返回一组子任务,例如
@app.task(bind=True)
def master_task(self):
urls = scrape_list_of_urls()
job = group((self.app.signature('tasks_module.scrape_url_and_save', (url,)) for url in urls))
# run scrape_url_and_save in parallel e.g. using gevent/eventlet worker pool
return job.delay()
-
有类似的SO问题可以解决运行时错误的问题。问题链接
-
有什么原因,为什么这么复杂?我认为有这么多任务过于复杂。也许我错了,但要点 3.在您的列表中是多余的,无需一次将数据存储在数据库中。
因此,如果我是你,我会做一个master_task
,它将为每个将执行抓取工作的 urlscrape_url.delay(url)
生成,并将结果保存到数据库中。这将导致在队列中生成许多任务,然后并行处理它们将取决于辅助角色的数量。