调用celery.task.chunks的正确方法是什么



我正在使用Python3和芹菜进行可并行的任务。我喜欢把它分成几个部分,这样通过网络进行通信的成本就会节省下来。然而,芹菜文档没有透露关于如何调用结果块的足够详细信息。尝试了不同的方法,但没有像我预期的那样奏效。我的代码段如下:

@app.task(name='pl.startregret')
def startregret(**kwargs): 
    items = list(zip(range(1000), range(1000)))
    chunk = regretclick.chunks(items, 10)
    print(chunk)
    for c in chunk:
        print(c)

@app.task(name='pl.regretclick')
def regretclick(x,y):
    print('got it.')
    return x + y

我读了一些代码,认为代码中的那个块应该是生成器。但是,打印输出显示

[2014-10-15 13:12:15,930: WARNING/Worker-2] args
[2014-10-15 13:12:15,931: WARNING/Worker-2] subtask_type
[2014-10-15 13:12:15,931: WARNING/Worker-2] kwargs
[2014-10-15 13:12:15,931: WARNING/Worker-2] immutable
[2014-10-15 13:12:15,931: WARNING/Worker-2] options
[2014-10-15 13:12:15,931: WARNING/Worker-2] task

有什么关于正确调用块的建议吗?

谢谢,

更新:我已经阅读了源代码并尝试了chunk()。现在唯一的问题似乎是使用了默认队列,而不是在celeryconfig中定义的队列。

考虑这样一个简单的添加任务。

@app.task()
def add(x, y):
    return x + y

这里有一个调用区块任务的简单方法。

res = add.chunks(zip(range(10), range(10)), 2)()

这将给定的10个任务分块为每个大小为2的5个任务,并将chunked tasks添加到默认队列中。如果要将它路由到不同的队列,则必须在调用任务时指定它。

res = add.chunks(zip(range(10), range(10)), 2).apply_async(queue='my_special_queue')

然后启动该队列的工作程序以消耗任务

worker -A your_app worker -l info -Q my_special_queue

相关内容

最新更新