芹菜工人睡眠不正常



我有下一个问题,我在 Python 上使用一个必须等待 X 秒的进程,进程本身工作正常,问题是当我把它作为任务放在芹菜上时。

当工作线程尝试对一项任务执行 time.sleep(X) 时,它会暂停工作线程中的所有任务,例如:

我有工作线程 A,它可以同时执行 4 个任务(q、w、e 和 r),任务 r 的睡眠时间为 1800 秒,因此工作线程同时执行 4 个任务,但是当 r 任务执行睡眠时,工人也会停止 q、w 和 e。

这正常吗?你知道我该怎么解决这个问题吗?

编辑:这是用我的节拍和队列 celery.py 的示例

app.conf.update(
CELERY_DEFAULT_QUEUE='default',
CELERY_QUEUES=(
    Queue('search', routing_key='search.#'),
    Queue('tests', routing_key='tests.#'),
    Queue('default',    routing_key='tasks.#'),
),
CELERY_DEFAULT_EXCHANGE='tasks',
CELERY_DEFAULT_EXCHANGE_TYPE='topic',
CELERY_DEFAULT_ROUTING_KEY='tasks.default',
CELERY_TASK_RESULT_EXPIRES=10,
CELERYD_TASK_SOFT_TIME_LIMIT=1800,
CELERY_ROUTES={
    'tests.tasks.volume': {
        'queue': 'tests',
        'routing_key': 'tests.volume',
    },
    'tests.tasks.summary': {
        'queue': 'tests',
        'routing_key': 'tests.summary',
    },
    'search.tasks.links': {
        'queue': 'search',
        'routing_key': 'search.links',
    },
    'search.tasks.urls': {
        'queue': 'search',
        'routing_key': 'search.urls',
    },
},
CELERYBEAT_SCHEDULE={
    # heavy one
    'each-hour-summary': {
        'task': 'tests.tasks.summary',
        'schedule': crontab(minute='0', hour='*/1'),
        'args': (),
    },
    'each-hour-volume': {
        'task': 'tests.tasks.volume',
        'schedule': crontab(minute='0', hour='*/1'),
        'args': (),
    },
    'links-each-cuarter': {
        'task': 'search.tasks.links',
        'schedule': crontab(minute='*/15'),
        'args': (),
    },
    'urls-each-ten': {
        'schedule': crontab(minute='*/10'),
        'task': 'search.tasks.urls',
        'args': (),
    },
}
)

test.tasks.py

@app.task
def summary():
    execute_sumary() #heavy task ~ 1 hour aprox
@app.task
def volume():
    execute_volume() #no important ~ less than 5 minutes

和 search.tasks.py

@app.task
def links():
    free = search_links() #return boolean
    if free:
        process_links()
    else:
        time.sleep(1080) #<--------sleep with which I have problems
    process_links()
@app.task
def urls():
    execute_urls() #no important ~ less than 1 minute

好吧,我有 2 个工人,A 用于队列搜索,B 用于测试和 defaul。

问题出在 A 上,当它接受任务"链接"并执行 time.sleep() 时,它会停止工作线程正在执行的其他任务。

因为工作线程 B 工作正常,我认为问题是 time.sleep() 函数。

如果你只有一个进程/线程,调用sleep()会阻止它。这意味着不会运行其他任务...

您设置了CELERYD_TASK_SOFT_TIME_LIMIT=1800,但您的睡眠时间为1080。在此时间间隔内只能工作一个或两个任务。套装CELERYD_TASK_SOFT_TIME_LIMIT>(1080+(工作时间))*3启动芹菜工作器时设置更多 --并发 (> 4)。

相关内容

  • 没有找到相关文章

最新更新