我想将Celery用作URL抓取器。
我有一个 URL 列表,我必须在每个 URL 上执行 HTTP 请求并将结果写入一个文件(整个列表的相同文件(。
我的第一个想法是在 Celery 调用的任务中每n分钟节拍一次这样的代码:
@app.task
def get_urls(self):
results = [get_url_content.si(
url=url
) for url in urls]
ch = chain(
group(*results),
write_result_on_disk.s()
)
return ch()
这段代码运行良好,但存在 1 个问题:我有一千个 URL 要抓取,如果其中 1 个get_url_content失败,则不会调用write_result_on_disk,并且我们丢失了所有先前抓取的内容。
我想做的是通过拆分 URL 来对任务进行分块,获取它们的结果并将其写入磁盘。例如,20 个 url 的内容写入磁盘。
请问你有一个想法吗?我尝试了chunks()
功能,但没有得到真正有用的结果。
使用 CeleryBeat 执行类似 cron 的任务是个好主意。
我会尝试在您的get_url_content
微任务中捕获异常。当你抓住它们时,只需归还其他东西。这样,您可以在summarize_task中评估(例如计数,列出,检查(它们。
如何将块和链块用于另一个任务:
第 1 步:将区块转换为组:
如 http://docs.celeryproject.org/en/latest/userguide/canvas.html#chunks 中所述,.group()
将类型为celery.canvas.chunks
的对象转换为组,这是 Celery 中更常见的类型。
步骤 2:链接组和任务
http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives 中的"通过组合让你大吃一惊"部分提到:
将一个组与另一个任务链接在一起将自动升级 它是一个和弦
下面是包含这两个任务的一些代码以及我通常如何调用它们:
@app.task
def solve_micro_task(arg: str) -> dict:
...
@app.task
def summarize(items: List[List[dict]]):
flat_list = [item for sublist in items for item in sublist]
for report in flat_list:
...
chunk_tasks = solve_micro_task.chunks(<your iterator, e.g., a list>), 10) # type: celery.canvas.chunks
summarize_task = summarize.s()
chain(chunk_tasks.group(), summarize_task)()