芹菜使用一批消息执行任务



我想向芹菜发送消息,当它达到 100 条消息时,我希望芹菜批量执行它们。如果我想批量提交到数据库,这是一种常见情况。

为此,在谷歌搜索时,我发现了这个链接:用于用芹菜分批:http://celery.readthedocs.org/en/latest/reference/celery.contrib.batches.html

我的问题是,在示例中,没有明显的方法来获取提交到任务的数据

例如,假设我们一一提交一些消息:

task.apply_async((message,), link_error=error_handler.s())

然后我们有以下任务实现:

@celery.task(name="process.data", base=Batches, flush_every=100, flush_interval=1)
def process_messages(requests):
   for request in requests:
       print request /// how I can take the message data submitted in my task for process?

有没有其他方法可以用芹菜分批?谢谢

对于任何在多次试验和错误后发现这篇文章有用的人,我设法通过以下方式将数据从 SimplRequest 对象中取出:

当您通过以下方式提交数据时:

func.delay(data)

从请求对象中,您可以获得 args 属性,该属性是一个包含数据的列表:

request.args[0]
request.args[1] 
etc.

如果您通过以下方式提交数据:

func.apply_async((), {'data': data}, link_error=error_handler.s())

然后数据在 Kwargs 中作为字典提供:

request.kwargs['data']

最后,如示例所示,我们需要对所有请求执行循环以收集数据批处理

for r in requests:
       data = r.kwargs['data']
最好

用更简单明了的示例更新文档页面(此处)中的示例

https://github.com/celery/celery/blob/3.1/celery/contrib/batches.py 上可用的最后一个版本的batches.py在被弃用之前不适用于 Celery 5+/Python 3。

工作版本可以在 https://gist.github.com/robin-vjc/1a4676ccb055162082c5a061ab556f58

最新更新