如何在Celery中设置每个任务(或队列)的并发限制



我的示例代码是:

  • run.py
from celery import Celery, Task
app = Celery(
"app",
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Tokyo',
enable_utc=True,
backend='redis://127.0.0.1:6369/2',
broker='redis://127.0.0.1:6369/3',
include=['app.tasks']
)
app.conf.task_routes = {'app.tasks.long_run': {'queue': 'long_running_task'}}

class VerboseTask(Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("-------------------------------------------")
print(
f"FAILURE({task_id}):"
f"{self.request.task}{kwargs}, Exception={exc}, ErrorInfo={einfo}"
)
print("-------------------------------------------")
def on_success(self, retval, task_id, args, kwargs):
print("-------------------------------------------")
print(f"SUCCESS({task_id}):{self.request.task}{kwargs}")
print("-------------------------------------------")
def on_retry(self, exc, task_id, args, kwargs, einfo):
print("-------------------------------------------")
print(f"RETRY({task_id}):{self.request.task}{kwargs}")
print("-------------------------------------------")
  • app/tasks.py
from run import app, VerboseTask
from time import sleep

class TSK:
name = 'tsk'
@app.task(bind=True, base=VerboseTask)
def short_run(self, **kwargs):
for i in range(0, 3):
print(f'{self.request.task} [{self.request.id[0:5]}] → {i*5} ~ {(i+1)*5}')
sleep(5)
print('short_run finished')
@app.task(bind=True, base=VerboseTask)
def long_run(self, **kwargs):
for i in range(0, 10):
print(f'{self.request.task} [{self.request.id[0:5]}] → {i*5} ~ {(i+1)*5}')
sleep(5)
print('long_run finished')

我用经营芹菜

celery -A run worker -l info --concurrency=4 -Q long_running_task,celery

问题是,当我调用long_running_task四次,然后调用普通的celery任务时,long_running_task消耗了所有的工人,而celery任务需要等待四个长时间运行的任务。

我想至少保留两名工人来完成短期任务。例如,如果当前任务是两个短期运行的任务和一个长期运行的任务,则只有一个工作线程可用,并且只有短期运行任务可以运行。长时间运行的任务必须等待,而三个工人将在同一时间空闲来消耗其中一个。

换句话说,将长时间运行的任务的最大并发限制为2也是一种可能的解决方案。

我知道我可能会运行两个并发分离的Celery实例:

celery -A run worker -l info --concurrency=2 -n worker1@%h -Q long_running_task
celery -A run worker -l info --concurrency=2 -n worker2@%h -Q celery

但是

  1. 如果其他任务空闲,我也想消耗long_running_task工作人员(这不是那么重要的条件,可能会被忽略(
  2. 我使用docker容器,只想在其中保留一个ENTRYPOINT(换句话说,我不想通过运行单独的命令来分隔工作程序。我只想执行一次celery -A run worker ...(

有什么方法可以为满足我的条件的工作人员自定义限制和并发性吗?

您可以让一个工作人员从多个队列中消费。如果你有一个高优先级队列和一个长运行队列,你可以为高优先级队列指定一个专用的工作人员,以确保总是有一个工作人员准备好接那些,然后让其他人同时使用高优先级和长运行任务。

至于入口点,只需为入口点使用一个shell脚本,该脚本接受一个参数(要使用的队列列表(,因此您只有1个docker映像,但有2个部署:1个用于high_priority队列,1个用于两个队列,并且您可以根据负载分别扩展每个部署。

相关内容

  • 没有找到相关文章

最新更新