芹菜的第一步- result.get()超时错误



我在这里遵循芹菜第一步教程:http://celery.readthedocs.org/en/latest/getting-started/first-steps-with-celery.html#keeping-results

我是按照教程,与RabbitMQ。

当我做result.get(timeout=1)时,它显示一个超时错误,即使它是一个简单的添加操作,我可以看到工人运行并产生正确的结果(8)在另一个窗口

(venv) C:Voltcelerytest>ipython
Python 2.7.6 (default, Nov 10 2013, 19:24:18) [MSC v.1500 32 bit (Intel)]
Type "copyright", "credits" or "license" for more information.
IPython 2.1.0 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object', use 'object??' for extra details.
In [1]: from tasks import add
In [2]: a = add(1,3)
In [3]: a
Out[3]: 4
In [4]: a = add.delay(1,3)
In [5]: a.ready()
Out[5]: False
In [6]: a = add.delay(4,4)
In [7]: a.get(timeout=0.5)
---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
<ipython-input-7-2c407a92720e> in <module>()
----> 1 a.get(timeout=0.5)
C:UsersSomEnvsvenvlibsite-packagesceleryresult.pyc in get(self, timeout,
 propagate, interval, no_ack, follow_parents)
    167                 interval=interval,
    168                 on_interval=on_interval,
--> 169                 no_ack=no_ack,
    170             )
    171         finally:
C:UsersSomEnvsvenvlibsite-packagescelerybackendsamqp.pyc in wait_for(se
lf, task_id, timeout, cache, propagate, no_ack, on_interval, READY_STATES, PROPA
GATE_STATES, **kwargs)
    155                                     on_interval=on_interval)
    156             except socket.timeout:
--> 157                 raise TimeoutError('The operation timed out.')
    158
    159         if meta['status'] in PROPAGATE_STATES and propagate:
TimeoutError: The operation timed out.
In [8]:

tasks.py文件

from celery import Celery
app = Celery('tasks', backend='amqp', broker='amqp://')

@app.task
def add(x, y):
    return x + y

工作日志

[tasks]
  . tasks.add
[2014-07-17 13:00:33,196: INFO/MainProcess] Connected to amqp://guest:**@127.0.0
.1:5672//
[2014-07-17 13:00:33,211: INFO/MainProcess] mingle: searching for neighbors
[2014-07-17 13:00:34,220: INFO/MainProcess] mingle: all alone
[2014-07-17 13:00:34,240: WARNING/MainProcess] celery@SomsPC ready.
[2014-07-17 13:00:34,242: INFO/MainProcess] Received task: tasks.add[85ff75d8-38
b5-442a-a574-c8b976a33739]
[2014-07-17 13:00:34,243: INFO/MainProcess] Task tasks.add[85ff75d8-38b5-442a-a5
74-c8b976a33739] succeeded in 0.000999927520752s: 4
[2014-07-17 13:00:46,582: INFO/MainProcess] Received task: tasks.add[49de7c6b-96
72-485d-926e-a4e564ccc89a]
[2014-07-17 13:00:46,588: INFO/MainProcess] Task tasks.add[49de7c6b-9672-485d-92
6e-a4e564ccc89a] succeeded in 0.00600004196167s: 8

我在经历了"芹菜的第一步"之后遇到了完全相同的问题。

我认为原因是backend='amqp'

为我工作的设置如下:

app = Celery('tasks', broker='amqp://guest@localhost//')
app.conf.CELERY_RESULT_BACKEND = 'db+sqlite:///results.sqlite'

根据文档,当使用AMQP结果后端时,每个结果只能检索一次(它实际上是查询中的单个消息)。

我想,您的工作进程检索它是为了将结果打印到控制台:

Task tasks.add[49de7c6b-9672-485d-926e-a4e564ccc89a] succeeded in 0.00600004196167s: 8

,因此您无法再次检索相同的结果

如果你看看这个线程,似乎设置--pool=solo也解决了这个问题。

有时我也收到TimeoutError与redis,所以我实现辅助功能:

celery_app.update(
    redis_socket_timeout=5,
    redis_socket_connect_timeout=5,
)

def run_task(task, *args, **kwargs):
    timeout = 2 * 60
    future = task.apply_async(args, kwargs)
    time_end = time.time() + timeout
    while True:
        try:
            return future.get(timeout=timeout)
        except redis.TimeoutError:
            if time.time() < time_end:
                continue
            raise

我知道我的答案晚了,但也许它会帮助别人。

你只需要在配置完后端后重新启动已经在运行的worker。您可以在"第一步"页面上找到相关信息,但只能在文章的最后。

确保你没有任何老工人还在运行。

很容易意外启动多个工人,所以要确保在启动一个新worker之前,将正确关闭先前的worker。

没有配置预期结果后端的旧worker可能正在运行并且正在劫持任务。

最新更新