我想以标准的方式连锁芹菜任务。
我有一个json文件。在该文件中,有许多harcoded url。我需要删除那些链接,再加上删除在删除这些链接时发现的链接。
目前,我正在这样做。
for each_news_source, news_categories in rss_obj.iteritems():
for each_category in news_categories:
category = each_category['category']
rss_link = each_category['feed']
json_id = each_category['json']
try:
list_of_links = getrsslinks(rss_link)
for link in list_of_links:
scrape_link.delay(link, json_id, category)
except Exception,e:
print "Invalid url", str(e)
我想要getrsslinks
也是一个芹菜任务,然后getrsslinks
返回的URL列表的废弃也应该是另一个芹菜作业。
它遵循这种模式
harcodeJSONURL1--
--`getrsslinks` (celery task)
--scrap link 1 (celery task)
--scrap link 2 (celery task)
--scrap link 3 (celery task)
--scrap link 4 (celery task)
harcodeJSONURL2--
--`getrsslinks` (celery task)
--scrap link 1 (celery task)
--scrap link 2 (celery task)
--scrap link 3 (celery task)
--scrap link 4 (celery task)
等等。
我该怎么做
查看Celery中的子任务选项。在你的情况下,小组应该有所帮助。您只需要在getrsslinks
中调用一个scrape_link
组。
from celery import group
@app.task
def getrsslinks(rsslink, json_id, category):
# do processing
# Call scrape links
scrape_jobs = group(scrape_link.s(link, json_id, category) for link in link_list)
scrape_jobs.apply_async()
...
您可能希望getrsslinks
返回scrape_jobs
,以便更轻松地监视作业。然后在解析json文件时,您会像这样调用getrsslinks
。
for each_news_source, news_categories in rss_obj.iteritems():
for each_category in news_categories:
category = each_category['category']
rss_link = each_category['feed']
json_id = each_category['json']
getrsslinks.delay(rss_link, json_id, category)
最后,为了监视哪些链接无效(因为我们替换了try/except块),您需要存储所有getrsslinks
任务并观察成功或失败。您可以将apply_async
与link_error
一起使用。