我正在尝试根据用户按钮单击暂时暂停芹菜任务。
我所做的是:
当用户单击按钮时;我发布了一个 AJAX 请求,该请求将我的芹菜任务状态更新为"暂停">
然后,我的策略是;当我将一个任务初始化为芹菜时;它运行一个for循环。 每个 for 循环;我读取我的数据库"状态",看看它是否设置为暂停:如果设置为暂停;我想让它睡 60 秒或睡到用户点击恢复按钮;同样的想法。
这是我的代码:
r = redis.StrictRedis(host='localhost', port=6379, db=0)
@celery.task(bind=True)
def runTask(self, arr)
for items in arr:
current_task_id = self.request.id
item = r.get('celery-task-meta-'+current_task_id)
load_as_json = json.loads(item)
if "PAUSE" in load_as_json['status']:
sleep(50)
@app.route('/start')
def start_task()
runTask.apply_async(args=[arr])
return 'task started running
以下是我的暂停 API 端点的外观:
@app.route('/stop/<task_id>')
def updateTaskState():
task_id = request.cookie.get('task_id')
loadAsJson = json.loads(r.get('celery-task-meta-'+str(task_id)))
loadAsJson['status'] = 'PAUSE'
loadAsJson.update(loadAsJson)
dump_as_json = json.dumps(loadAsJson)
updated_state = r.set('celery-task-meta-'+last_key, dump_as_json)
return 'updated state';
根据我的概念理解;我看不到更新状态的原因是;任务已经执行并且无法从数据库中检索更新的值。 仅供参考:任务更新状态设置为立即暂停;我通过创建一个单独的脚本来检查 while 循环中的状态来检查这一点;每次我单击释放 AJAX 请求以更新状态的按钮时;我的数据库更新了,它在单独的脚本上显示"暂停";但是在 @celery.task 装饰器中,我似乎无法获得更新的状态。
下面是我用来测试的单独脚本; 它似乎是预期的更新状态;我只是无法在任务装饰器中获取更新的状态...古怪。
r = redis.StrictRedis(host='localhost', port=6379, db=0)
last_key = r.keys()
while True:
response = r.get('celery-task-meta-b1534a87-e18b-4f0a-89e2-08348d833056')
loadAsJson = json.loads(response)
print loadAsJson['status']
面对同样的问题,没有好的答案,我想出了你可能喜欢的解决方案,它不依赖于你正在使用的消息队列(又名 Redis 或 RabbitMQ)。对我来说,关键是 celery.app.task.Task 类中的 update_state 方法将task_id作为可选参数。就我而言,我通过多个工作器节点运行长时间运行的文件复制和校验和任务,有时用户希望暂停一个正在运行的任务以减少对存储的性能要求,以允许其他任务首先完成。我还运行一个无状态的 Flask REST API 来启动后端任务并检索正在运行的任务的状态,所以我需要一种方法来让 API 调用进来暂停和恢复任务。
这是我的测试函数,它可以通过监视自己的状态来接收"消息"以暂停自身:
celery.task(bind=True)
def long_test(self, i):
print('long test starting with delay of ' + str(i) + 'seconds on each loop')
print('task_id =' + str(self.request.id))
self.update_state(state='PROCESSING')
count = 0
while True:
task = celery.AsyncResult(self.request.id)
while task.state == 'PAUSING' or task.state == 'PAUSED':
if task.state == 'PAUSING':
self.update_state(state='PAUSED')
time.sleep(i)
if task.state == 'RESUME':
self.update_state(state='PROCESSING')
print('long test loop ' + str(count) + ' ' + str(task.state))
count += 1
time.sleep(i)
然后,为了暂停或恢复,我可以执行以下操作:
>>> from project.celeryworker.tasks import long_test
>>> from project import create_app, make_celery
>>> flaskapp = create_app()
>>> celery = make_celery(flaskapp)
>>> from celery.app.task import Task
>>> long_test.apply_async(kwargs={'i': 5})
<AsyncResult: bf19d50f-cf04-47f0-a069-6545fb253887>
>>> Task.update_state(self=celery, task_id='bf19d50f-cf04-47f0-a069-6545fb253887', state='PAUSING')
>>> celery.AsyncResult('bf19d50f-cf04-47f0-a069-6545fb253887').state
'PAUSED'
>>> Task.update_state(self=celery, task_id='bf19d50f-cf04-47f0-a069-6545fb253887', state='RESUME')
>>> celery.AsyncResult('bf19d50f-cf04-47f0-a069-6545fb253887').state
'PROCESSING'
>>> Task.update_state(self=celery, task_id='bf19d50f-cf04-47f0-a069-6545fb253887', state='PAUSING')
>>> celery.AsyncResult('bf19d50f-cf04-47f0-a069-6545fb253887').state
'PAUSED'