我的示例代码是:
- run.py
from celery import Celery, Task
app = Celery(
"app",
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Tokyo',
enable_utc=True,
backend='redis://127.0.0.1:6369/2',
broker='redis://127.0.0.1:6369/3',
include=['app.tasks']
)
app.conf.task_routes = {'app.tasks.long_run': {'queue': 'long_running_task'}}
class VerboseTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("-------------------------------------------")
print(
f"FAILURE({task_id}):"
f"{self.request.task}{kwargs}, Exception={exc}, ErrorInfo={einfo}"
)
print("-------------------------------------------")
def on_success(self, retval, task_id, args, kwargs):
print("-------------------------------------------")
print(f"SUCCESS({task_id}):{self.request.task}{kwargs}")
print("-------------------------------------------")
def on_retry(self, exc, task_id, args, kwargs, einfo):
print("-------------------------------------------")
print(f"RETRY({task_id}):{self.request.task}{kwargs}")
print("-------------------------------------------")
- app/tasks.py
from run import app, VerboseTask
from time import sleep
class TSK:
name = 'tsk'
@app.task(bind=True, base=VerboseTask)
def short_run(self, **kwargs):
for i in range(0, 3):
print(f'{self.request.task} [{self.request.id[0:5]}] → {i*5} ~ {(i+1)*5}')
sleep(5)
print('short_run finished')
@app.task(bind=True, base=VerboseTask)
def long_run(self, **kwargs):
for i in range(0, 10):
print(f'{self.request.task} [{self.request.id[0:5]}] → {i*5} ~ {(i+1)*5}')
sleep(5)
print('long_run finished')
我用经营芹菜
celery -A run worker -l info --concurrency=4 -Q long_running_task,celery
问题是,当我调用long_running_task
四次,然后调用普通的celery
任务时,long_running_task
消耗了所有的工人,而celery
任务需要等待四个长时间运行的任务。
我想至少保留两名工人来完成短期任务。例如,如果当前任务是两个短期运行的任务和一个长期运行的任务,则只有一个工作线程可用,并且只有短期运行任务可以运行。长时间运行的任务必须等待,而三个工人将在同一时间空闲来消耗其中一个。
换句话说,将长时间运行的任务的最大并发限制为2
也是一种可能的解决方案。
我知道我可能会运行两个并发分离的Celery实例:
celery -A run worker -l info --concurrency=2 -n worker1@%h -Q long_running_task
celery -A run worker -l info --concurrency=2 -n worker2@%h -Q celery
但是
- 如果其他任务空闲,我也想消耗
long_running_task
工作人员(这不是那么重要的条件,可能会被忽略( - 我使用docker容器,只想在其中保留一个
ENTRYPOINT
(换句话说,我不想通过运行单独的命令来分隔工作程序。我只想执行一次celery -A run worker ...
(
有什么方法可以为满足我的条件的工作人员自定义限制和并发性吗?
您可以让一个工作人员从多个队列中消费。如果你有一个高优先级队列和一个长运行队列,你可以为高优先级队列指定一个专用的工作人员,以确保总是有一个工作人员准备好接那些,然后让其他人同时使用高优先级和长运行任务。
至于入口点,只需为入口点使用一个shell脚本,该脚本接受一个参数(要使用的队列列表(,因此您只有1个docker映像,但有2个部署:1个用于high_priority队列,1个用于两个队列,并且您可以根据负载分别扩展每个部署。