在一个芹菜任务中执行多个芹菜任务



我希望在另一个芹菜任务中执行多个相关的芹菜任务。这可能吗?

下面的代码将给一个更好的想法,我想完成

@shared_task(ignore_result=True)
def test_job():
try:
another_test_job.delay()
yet_another_test_job.delay()
except Exception as e:
logger.error(f'Something happened when collecting edr data ===> {e}')
raise

@shared_task(ignore_result=True)
def another_test_job():
try:
print('this is another test')
except Exception as e:
logger.error(f'Something happened when collecting edr data ===> {e}')
raise

@shared_task(ignore_result=True)
def yet_another_test_job():
try:
print('this is yet another test')
except Exception as e:
logger.error(f'Something happened when collecting edr data ===> {e}')
raise

我的目标是尝试整合我计划的任务数量,本质上是将我的任务分配到一个任务下。

我试着执行任务手动使用test_job.apply()

在搜索结果

In [2]: test_job.apply()                                                                                                                                       
2021-09-30 13:57:49,196 amqp                                DEBUG    Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'someclustername', 'copyright': 'Copyright (c) 2007-2020 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 22.3.4.11', 'product': 'RabbitMQ', 'version': '3.8.9'}, mechanisms: [b'PLAIN'], locales: ['en_US']
2021-09-30 13:57:49,259 amqp                                DEBUG    using channel_id: 1
2021-09-30 13:57:49,307 amqp                                DEBUG    Channel open
2021-09-30 13:57:49,431 celery.app.trace                    INFO     Task myapp.tasks.jobs.test_job[3d0c3a93-157c-46bf-b592-ebce0850c49e] succeeded in 0.4412529049999989s: None
Out[2]: <EagerResult: 3d0c3a93-157c-46bf-b592-ebce0850c49e>

如果我直接测试单个工作,它工作得很好…another_test_job.apply()

In [3]: another_test_job.apply()                                                                                                                                                            
this is another test
2021-09-30 13:59:38,601 celery.app.trace                    INFO     Task myapp.tasks.jobs.another_test_job[6499aaff-bc88-4df1-9f77-78ff68f29915] succeeded in 0.00015368999999054722s: None
Out[3]: <EagerResult: 6499aaff-bc88-4df1-9f77-78ff68f29915>
谁能给我解释一下发生了什么事?

不幸的是,它不是这样工作的。您需要创建一个芹菜组,并触发组执行。能够"tie"所有的执行,在将变量添加到Group时将其传递给任务,因此您可以使用此引用使所有这些任务都向中心资源报告。然后检查中央资源,查看是否已经报告了组中的所有任务。

我会在这里粘贴代码我已经使用芹菜集团管理任务。你确实需要一个中心资源(比如数据库),并让你的任务向它报告,这样你就可以去那里检查进度。但是我分享的代码可以帮助您了解如何管理芹菜组。我使用它们是为了在编码时不处理芹菜的技术问题。

Django依赖项可以删除和交换为您的项目需求。

import importlib
from celery import group
from celery.result import AsyncResult
from django.core.exceptions import ValidationError
from django.conf import settings

class CeleryEntity:
def __init__(self, uuid=None):
self.uuid = uuid
self._celery_app = None
self._result = None
self._initialize()
def __str__(self):
return self.uuid
def _initialize(self, celery_app=None):
if not celery_app:
celery_app_reference = getattr(settings, 'CELERY_TOOLS_APP', None)
celery_app = getattr(importlib.import_module(celery_app_reference[0]), celery_app_reference[1])
self._celery_app = celery_app
def _validate_celery_app(self):
self._initialize()
if not self._celery_app:
raise ValidationError(
'No Celery App Object. Check your CELERY_TOOLS_APP on settings.py')

class CeleryTaskGroup(CeleryEntity):
def __init__(self, uuid=None, save=False):
super().__init__(uuid)
self.save = save
self._tasks = []
self._result_set = None
def __str__(self):
return self.uuid
def _initialize(self, celery_app=None):
if not celery_app:
celery_app_reference = getattr(settings, 'CELERY_TOOLS_APP', None)
celery_app = getattr(importlib.import_module(celery_app_reference[0]), celery_app_reference[1])
self._celery_app = celery_app
def _validate_tasks(self):
if not self._tasks:
return ValidationError('No Tasks defined. Add tasks using append_task method')
def _get_result_set(self):
if not self._result_set:
self._validate_celery_app()
results = self._celery_app.backend.restore_group(str(self.uuid))
self._result_set = results
return self._result_set
def length(self):
return len(self._tasks)
def stats(self):
""" Process statistics dictionary """
results = self._get_result_set()
if results:
ready = results.ready()
successful = results.successful()
completed = results.completed_count()
else:
ready = False
successful = False
completed = 0
stats = {
'ready': ready,
'successful': successful,
'completed': completed,
}
return stats
def append_task(self, task_name, params=()):
self._tasks.append(self._celery_app.signature(task_name, params))
def run_now(self, queue=None, wait=False):
self._validate_celery_app()
self._validate_tasks()
job = group(self._tasks)
group_results = job.apply_async(queue=queue)
if self.save:
group_results.save()
self.uuid = group_results.id
if wait:
group_results.join()
return group_results
def revoke_process(self):
grp = self._get_group()
if grp:
grp.revoke(terminate=True)
self._delete_group()
def _get_group(self):
self._validate_celery_app()
grp = self._celery_app.backend.restore_group(str(self.uuid))
return self._celery_app.GroupResult(id=str(self.uuid), results=grp) if grp else None
def _delete_group(self):
self._validate_celery_app()
self._celery_app.backend.delete_group(str(self.uuid))

class CeleryTask(CeleryEntity):
def _get_result(self):
if not self._result:
self._validate_celery_app()
self._result = AsyncResult(str(self.uuid), app=self._celery_app)
return self._result
def status(self):
result = self._get_result()
workers = self._celery_app.control.inspect().registered()
if result.id == 'None' or not workers:
return 'NO_TASK'
return result.state
def revoke(self):
result = self._get_result()
result.revoke(terminate=True)
def run(self, task_name, params, queue=None):
result = self._celery_app.signature(task_name, params).apply_async(queue=queue)
self.uuid = result.id
return result

相关内容

  • 没有找到相关文章

最新更新