尝试使用芹菜进行以下情况:
示例:
-
具有一组不同的测试(每个测试都有一个dicts供使用)
-
有5个不同的环境来运行这些测试。
我可以在5个EVN上同时执行一个测试(多处理)。现在试图制作芹菜。
问题是如何执行测试,它们不应重叠在一个Env中。
因此,一个env只能同时运行一个测试,但是与工人和队列的芹菜可以执行所有混搭。我尝试使用"帆布原始图",然后在循环中添加" test evns"。
喜欢:
tests_set_list = [{test1}, {test2}, {test3}]
envs_list = [{alpha}, {bravo}, {charlie}, {delta}, {echo}]
变体1:在这种情况下
"""This loop create tasks for each env"""
for env_item in envs_list:
for test_item in tests_set_list:
r_key = 'tests_run.bunch_test_execute_task.{0}.{1}'.format(test_item['test_folder_name'],
env_item['env_codename'])
bunch_test_execute_task.apply_async(kwargs={'env_item': env_item,
'test_item': [test_item],
'tkn_branch': branch,
},
queue='tests_run',
routing_key=r_key)
变体2: -> typeError:'asyncresult'对象无法订阅在这种情况下,任务仅添加第一次迭代,然后停止。
""" This loop create env inst for each task. Works in group
Works bad, did not add task in worker queue.
"""
list_of_groups = []
for test_item in tests_set_list:
tasks_group_by_env = []
for env_item in synced_envs:
r_key = 'tests_run.bunch_test_execute_env_q_task.{0}.{1}'.format(test_item['test_folder_name'],
env_item['env_codename'])
task = bunch_test_execute_env_q_task(env_item, [test_item], branch)
tasks_group_by_env.append(task)
groups = group(tasks_group_by_env)
list_of_groups.append(groups)
for group_item in list_of_groups:
# job = group(*group_item)
job = group_item
job.apply_async(queue='tests_run')
变体3: -> typeError:不可订购类型:float()> = str()但是无论如何,任务是在队列中添加的,但是以重叠的方式执行。
""" This loop use one test item and iter envs from list of synced envs to add groups """
list_of_groups = []
for test_item in tests_set_list:
r_key = 'tests_run.bunch_test_execute_env_q_task.{0}.{1}'
lazy_group = group(bunch_test_execute_env_q_task.apply_async(
args=[env_item['env_codename']],
kwargs={'env_item': env_item,
'test_item': [test_item],
'tkn_branch': branch,
},
queue='tests_run',
routing_key=r_key.format(test_item['pattern_folder_name'],
env_item['env_codename']))for env_item in envs_list)
# list_of_tasks.append(task)
list_of_groups.append(lazy_group)
all_groups = group(*list_of_groups)
all_groups()
我想实现的摘要:每个测试将获得单独的测试env。为此,这是一个不同的工作:
job_1 = alpha(test1)
job_2 = bravo(test1)
job_3 = charlie(test1)
job_4 = delta(test1)
job_5 = echo(test1)
...
job_n+1 = alpha(test_n+1)
job_n+1 = bravo(test_n+1)
job_n+1 = charlie(test_n+1)
job_n+1 = delta(test_n+1)
job_n+1 = echo(test_n+1)
这似乎可以简单地通过为每个测试env添加单独的工作者来实现,因此它们永远不会彼此重叠,因为在单独的流程工作人员中运行,但是它并不优雅,因为环境可以不时改变。
简单的答案是使用队列:
PY上的样子: - 这只是为现有工人添加队列。如何增加工人 - 请参阅芹菜妖魔文档
app.control.add_consumer(
queue='alpha',
exchange = 'tests_run',
exchange_type = 'direct',
routing_key = 'alpha.*',
destination = ['alpha@tentacle'])
然后,当您要同时执行5个不同的任务而没有任何重叠(例如,使用5个diff envs)时,只需添加任务:
queue =" desipable_queue"
TestExecTasks().your_task.apply_async(
args=[arg1,arg2],
kwargs=dict(test=test_set, env=some_env),
queue=queue, routing_key=r_key,
soft_time_limit=HOURS_2, task_time_limit=HOURS_2+500)
这是什么意思?您可以有代表每个工作环境的工人:
- Alpha,Beta,Charlie,Delta,Echo
因此,您在循环中执行一个任务,每个任务都与您想要的env一起执行。所有任务都会在工人身上运行,因为工人有独特的队列附带。因此,您需要有5名工人 5个队列。这是一个技巧。当然,您甚至可以在每个任务过程中都包含线程并进行更多。