由于没有人提供解决方案,加上我迫切需要一个变通的方法,这里是我的情况和一些抽象的解决方案/想法供讨论。
我的堆栈:
- 龙卷风
- 芹菜
- MongoDB
- 复述, RabbitMQ
我的问题:找到龙卷风调度芹菜任务的方法(解决),然后异步收集结果(有什么想法吗?).
场景1:(请求/响应hack + webhook)
- Tornado接收到一个(用户)请求,然后在本地内存(或Redis)中保存一个{jobID:(用户)请求}来记住在哪里传播响应,并使用jobID 触发一个芹菜任务。
- 当celery完成任务时,它在某个url上执行webhook,并告诉tornado这个jobID已经完成(加上结果)
- Tornado检索(用户)请求并将响应转发给(用户)
这会发生吗?这有什么逻辑吗?
场景2:(龙卷风+长轮询)
- Tornado调度芹菜任务并返回一些主要json数据给客户端(jQuery) jQuery在接收到主json时执行一些长轮询,例如每x微秒一次,并根据某些数据库标志进行龙卷风响应。当芹菜任务完成时,这个数据库标志被设置为True,然后jQuery"循环"完成。
这样有效吗?
还有其他的想法/模式吗?
我的解决方案涉及从龙卷风到芹菜的轮询:
class CeleryHandler(tornado.web.RequestHandlerr):
@tornado.web.asynchronous
def get(self):
task = yourCeleryTask.delay(**kwargs)
def check_celery_task():
if task.ready():
self.write({'success':True} )
self.set_header("Content-Type", "application/json")
self.finish()
else:
tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)
tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task)
这是我们对这个问题的解决方案。由于我们在应用程序的多个处理程序中查找结果,因此我们将芹菜查找作为一个mixin类。
这也使得使用龙卷风的代码更易读。创模式。
from functools import partial
class CeleryResultMixin(object):
"""
Adds a callback function which could wait for the result asynchronously
"""
def wait_for_result(self, task, callback):
if task.ready():
callback(task.result)
else:
# TODO: Is this going to be too demanding on the result backend ?
# Probably there should be a timeout before each add_callback
tornado.ioloop.IOLoop.instance().add_callback(
partial(self.wait_for_result, task, callback)
)
class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler):
"""Execute a task asynchronously over a celery worker.
Wait for the result without blocking
When the result is available send it back
"""
@tornado.web.asynchronous
@tornado.web.authenticated
@tornado.gen.engine
def post(self):
"""Test the provided Magento connection
"""
task = expensive_task.delay(
self.get_argument('somearg'),
)
result = yield tornado.gen.Task(self.wait_for_result, task)
self.write({
'success': True,
'result': result.some_value
})
self.finish()
我偶然发现了这个问题,反复点击结果后端对我来说并不是最佳的。所以我使用Unix Sockets实现了一个类似于场景1的Mixin。
它在任务完成后立即通知Tornado(准确地说,是在链中的下一个任务运行时),并且只命中结果后端一次。
现在,https://github.com/mher/tornado-celery来拯救…
class GenAsyncHandler(web.RequestHandler):
@asynchronous
@gen.coroutine
def get(self):
response = yield gen.Task(tasks.sleep.apply_async, args=[3])
self.write(str(response.result))
self.finish()