我有以下代码。这很好用。但我正在get_host_types()
中的for循环中迭代度量。我想从get_host_types()
函数为每个度量创建子任务,这些子任务将调用芹菜任务get_host_type()
。这将允许子任务在工作节点上独立运行。我想等待方法get_host_types()
中的结果并返回结果。我想用group()
。但是我不能在AsyncResult()
上呼叫.get()
。如果我不并行化,我就不会利用分布式任务框架来加快主任务的速度。
from __future__ import print_function
from celery import Celery, group
import requests
app = Celery('celery_test')
app.config_from_object('config')
@app.task
def get_host_type(metric, alert):
host_types = get_host_types_for_alert(alert['alert_id'], metric)
return host_types
class MyObject(dict):
def __init__(self, alert, host_types):
dict.__init__(self, alert=alert, host_types=host_types)
@app.task(serializer='json')
def get_host_types(my_obj):
print(f"alert get_host_types ============> {my_obj}")
alert = my_obj['alert']['alert']
metrics = my_obj['host_types']
ret_val = set()
for m in metrics:
res = alert_id_host_type_mapper.get_host_types_for_alert(alert['id'], m)
ret_val.update(res)
print(f"Return value ======> {ret_val}")
return list(ret_val)
@app.task
def get_metrics(alert):
print(f" alert ==> {alert}")
#alert = alert[0]
metric = alerts_client.get_metrics(alert['alert'])
metrics = alert_id_host_type_mapper.metric_parser(metric)
return MyObject(alert, metrics)
@app.task
def get_alert(alert_id):
print(f" =====> alert id {alert_id}")
return alerts_client.get_alerts(alert_id)
if __name__ == "__main__":
res = (get_alert.s(267483) | get_metrics.s() | get_host_types.s()).apply_async()
print(res.get())
编辑:如果在子任务中执行result.get()
,则会出现以下错误。
[2022-02-10 19:36:03,904: WARNING/ForkPoolWorker-42] Exception in thread Thread-7:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/home/sshil/code/statsboard/statsboard/celery_test/celery_test.py", line 30, in run
result_int = res.get()
File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 680, in get
on_interval=on_interval,
File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 793, in join_native
assert_will_not_block()
File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 37, in assert_will_not_block
raise RuntimeError(E_WOULDBLOCK)
RuntimeError: Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks
您可以像下面的一样使用Celery Group
@app.task(serializer='json')
def get_host_types(my_obj):
print(f"alert get_host_types ============> {my_obj}")
alert = my_obj['alert']['alert']
metrics = my_obj['host_types']
ret_val = []
tasks = []
for m in metrics:
tasks.append(get_host_type.s(m,alert['id']))
# create a group with all the tasks
job = group(tasks)
result = job.apply_async()
ret_val = result.get(disable_sync_subtasks=False)
return ret_val
有关芹菜集团的更多信息,请参阅->http://ask.github.io/celery/userguide/groups.html#groups