猎鹰蟒蛇的例子与芹菜


import falcon
import json
from tasks import add
from waitress import serve

class tasksresource:
    def on_get(self, req, resp):
        """Handles GET requests"""
        self.result = add.delay(1, 2)
        self.context = {'ID': self.result.id, 'final result': self.result.ready()}
        resp.body = json.dumps(self.context)

api = falcon.API()
api.add_route('/result', tasksresource())
# api.add_route('/result/task', taskresult())
if __name__ == '__main__':
    serve(api, host='127.1.0.1', port=5555)

如何获取从 JSON 有效负载获取任务 ID(发布数据)并为其添加路由

这里有一个小例子。文件结构:

/project
      __init__.py
      app.py # routes, falcon etc.
      tasks.py # celery 
example.py # script for demonstration how it works 

app.py:

import json
import falcon
from tasks import add
from celery.result import AsyncResult

class StartTask(object):
    def on_get(self, req, resp):
        # start task
        task = add.delay(4, 4)
        resp.status = falcon.HTTP_200
        # return task_id to client
        result = {'task_id': task.id}
        resp.body = json.dumps(result)

class TaskStatus(object):
    def on_get(self, req, resp, task_id):
        # get result of task by task_id and generate content to client
        task_result = AsyncResult(task_id)
        result = {'status': task_result.status, 'result': task_result.result}
        resp.status = falcon.HTTP_200
        resp.body = json.dumps(result)

app = falcon.API()
# registration of routes
app.add_route('/start_task', StartTask())
app.add_route('/task_status/{task_id}', TaskStatus())

tasks.py:

from time import sleep
import celery

app = celery.Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

@app.task
def add(x, y):
    """
    :param int x:
    :param int y:
    :return: int
    """
    # sleep just for demonstration
    sleep(5)
    return x + y

现在我们需要启动celery应用程序。转到project文件夹并运行:

celery -A tasks worker --loglevel=info

在此之后,我们需要启动Falcon应用程序。转到project文件夹并运行:

gunicorn app:app

好的,一切都准备好了。

example.py是小型客户端,可以帮助理解:

from time import sleep
import requests
# start new task
task_info = requests.get('http://127.0.0.1:8000/start_task')
task_info = task_info.json()
while True:
    # check status of task by task_id while task is working
    result = requests.get('http://127.0.0.1:8000/task_status/' + task_info['task_id'])
    task_status = result.json()
    print task_status
    if task_status['status'] == 'SUCCESS' and task_status['result']:
        print 'Task with id = %s is finished' % task_info['task_id']
        print 'Result: %s' % task_status['result']
        break
    # sleep and check status one more time
    sleep(1)

只需调用python ./example.py,您应该会看到如下所示的内容:

{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'PENDING', u'result': None}
{u'status': u'SUCCESS', u'result': 8}
Task with id = 76542904-6c22-4536-99d9-87efd66d9fe7 is finished
Result: 8

希望这对你有帮助。

Danila Ganchar的上述例子很棒,非常有帮助。我在 Python 4.3.0 中使用芹菜版本 3,使用上面示例时收到的错误之一在此行:

task_result = AsyncResult(task_id)

我会收到的错误是:

AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'

这可能是最近的更改,但result.AsyncResult(或在本例中只是AsyncResult,因为他从 celery.result 导入了它)不知道您正在使用的后端。解决此问题有两种解决方案:

1) 您可以将实际任务本身的 AsyncResult add.AsyncResult(task_id)因为add任务已经通过 @app.task 装饰器定义了后端。在这个例子中,这样做的缺点是你希望能够通过Falcon端点传入task_id来获得任何任务的结果,所以这是有限的。

2) 首选方法是将 app 参数传递给 AsyncResult 函数:

task = result.AsyncResult(id, app=app)

希望这有帮助!

相关内容

  • 没有找到相关文章

最新更新