将链条、组和块与芹菜相结合



我想将Celery用作URL抓取器。

我有一个 URL 列表,我必须在每个 URL 上执行 HTTP 请求并将结果写入一个文件(整个列表的相同文件(。

我的第一个想法是在 Celery 调用的任务中每n分钟节拍一次这样的代码:

@app.task
def get_urls(self):
results = [get_url_content.si(
url=url
) for url in urls]
ch = chain(
group(*results),
write_result_on_disk.s()
)
return ch()

这段代码运行良好,但存在 1 个问题:我有一千个 URL 要抓取,如果其中 1 个get_url_content失败,则不会调用write_result_on_disk,并且我们丢失了所有先前抓取的内容。

我想做的是通过拆分 URL 来对任务进行分块,获取它们的结果并将其写入磁盘。例如,20 个 url 的内容写入磁盘。

请问你有一个想法吗?我尝试了chunks()功能,但没有得到真正有用的结果。

使用 CeleryBeat 执行类似 cron 的任务是个好主意。

我会尝试在您的get_url_content微任务中捕获异常。当你抓住它们时,只需归还其他东西。这样,您可以在summarize_task中评估(例如计数,列出,检查(它们。

如何将块和链用于另一个任务:

第 1 步:将区块转换为组:

如 http://docs.celeryproject.org/en/latest/userguide/canvas.html#chunks 中所述,.group()将类型为celery.canvas.chunks的对象转换为组,这是 Celery 中更常见的类型。

步骤 2:链接组和任务

http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives 中的"通过组合让你大吃一惊"部分提到:

将一个组与另一个任务链接在一起将自动升级 它是一个和弦


下面是包含这两个任务的一些代码以及我通常如何调用它们:

@app.task
def solve_micro_task(arg: str) -> dict:
...
@app.task
def summarize(items: List[List[dict]]):
flat_list = [item for sublist in items for item in sublist]
for report in flat_list:
...
chunk_tasks = solve_micro_task.chunks(<your iterator, e.g., a list>), 10)  # type: celery.canvas.chunks
summarize_task = summarize.s()
chain(chunk_tasks.group(), summarize_task)()

相关内容

  • 没有找到相关文章

最新更新