我在celery
工作进程中使用tornado.ioloop
,因为我需要使用mongodb。
class WorkerBase():
@gen.engine
def foo(self,args,callback)
bar = ['Python','Celery','Javascript','HTML']
# ... process something ....
callback(bar)
@gen.engine
def RunMyTask(self,args):
result = yield gen.Task(self.foo,args=args)
# Stop IOLoop instance
IOLoop.instance().stop()
@task(name="MyWorker",base=WorkerBase)
def CeleryWorker(args):
# This works because i'm adding base as WorkerBase
CeleryWorker.RunMyTask(args)
IOLoop.instance().start()
return True
当我调用一个任务时,它会给出一个错误:
[2014-10-02 12:12:11,561: ERROR/Worker-4] Exception in callback None
Traceback (most recent call last):
File "/var/www/myapp/env/local/lib/python2.7/site-packages/tornado/ioloop.py", line 832, in start
fd_obj, handler_func = self._handlers[fd]
KeyError: 16
或
[2014-10-02 12:12:11,561: ERROR/Worker-4] Exception in callback None
Traceback (most recent call last):
File "/var/www/myapp/env/local/lib/python2.7/site-packages/tornado/ioloop.py", line 832, in start
fd_obj, handler_func = self._handlers[fd]
KeyError: 14
这些错误并不一致。有加薪条件吗?
这看起来像是一个线程问题。我不熟悉芹菜的线程模型,但它看起来像是启动了CeleryWorker的多个副本,每个副本都试图运行相同的单例IOLoop.instance()。如果要这样运行,每个工作线程都需要自己的IOLoop-看看同步tornado.httpclient.httpclient如何创建和运行临时IOLoop
看起来您的worker任务刚刚返回,并且在ioloop停止之前被视为已完成,所以gen.engine的回调可能找不到原始的stack_context。
@task(name="MyWorker",base=WorkerBase) def CeleryWorker(args): # This works because i'm adding base as WorkerBase CeleryWorker.RunMyTask(args) IOLoop.instance().start() return True
我有一些建议给你
1) 删除返回
@task(name="MyWorker",base=WorkerBase)
def CeleryWorker(args):
# This works because i'm adding base as WorkerBase
CeleryWorker.RunMyTask(args)
IOLoop.instance().start()
2) 使用run_sync
import functools
@task(name="MyWorker",base=WorkerBase)
def CeleryWorker(args):
# This works because i'm adding base as WorkerBase
func = functools.partial(CeleryWorker.RunMyTask, args)
IOLoop.instance().run_sync(func)