芹菜:每个工人task_acks_late的不同设置/为芹菜添加自定义选项



这个问题是 django + 芹菜的后续:禁用一个工人的预取,有错误吗?

我在芹菜方面遇到了问题(请参阅我跟进的问题(,为了解决这个问题,我想有两个芹菜工人,每个芹菜工人的 -concurrency 1 但有两种不同的task_acks_late设置。

我目前的方法有效,但在我看来不是很漂亮。我正在执行以下操作:

在我的 Django 项目的settings.py

CELERY_TASK_ACKS_LATE = os.environ.get("LACK", "False") == "True"

这允许我使用以下命令启动芹菜工作器:

LACK=True celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 1 
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast

更直观的是,如果我能做这样的事情:

celery -A miniclry worker --concurrency=1 -n w2 -Q=fast,slow --prefetch-multiplier 1 --late-ack=True
celery -A miniclry worker --concurrency=1 -n w1 -Q=fast --late-ack=False

我发现了用不同的值初始化不同的芹菜工人,但不明白如何将它嵌入到我的 django/芹菜上下文中。我必须在哪些文件中添加向解析器添加参数的代码,以及如何使用自定义参数来修改task_acks_late芹菜设置。

更新:感谢@Greenev的回答,我设法为芹菜添加了自定义选项。然而,似乎使用这种机制更改配置"来得太晚了",并且没有考虑到chagne。

这里的一个可能的解决方案是提供acks_late=True作为shared_task装饰器的参数,给定上一个问题中的代码:

@shared_task(acks_late=True)
def task_fast(delay=0.1):
logger.warning("fast in")
time.sleep(delay)
logger.warning("fast out")

上。我没有使用这种方法设置task_acks_late,但您可以添加命令行参数,如下所示。

你已链接到解决方案。我在这里看不到任何 django 细节,只需将parser.add_argument代码放在您定义app的地方,给定上一个问题中的代码,您将得到如下所示的内容:

app = Celery("miniclry", backend="rpc", broker="pyamqp://")
app.config_from_object('django.conf:settings', namespace='CELERY')
def add_worker_arguments(parser):
parser.add_argument('--late-ack', default=False)
app.user_options['worker'].add(add_worker_arguments)

然后,您可以在信号处理程序中访问celeryd_init参数值

@celeryd_init.connect
def configure_worker(sender=None, conf=None, options=None, **kwargs):
conf.task_acks_late = options.get('late-ack') # get custom argument value from options

相关内容

  • 没有找到相关文章