Python芹菜在Envs上设置的函数集集,而无需重叠



尝试使用芹菜进行以下情况:

示例:

  • 具有一组不同的测试(每个测试都有一个dicts供使用)

  • 有5个不同的环境来运行这些测试。

我可以在5个EVN上同时执行一个测试(多处理)。现在试图制作芹菜。

问题是如何执行测试,它们不应重叠在一个Env中。

因此,一个env只能同时运行一个测试,但是与工人和队列的芹菜可以执行所有混搭。我尝试使用"帆布原始图",然后在循环中添加" test evns"。

喜欢:

tests_set_list =  [{test1}, {test2}, {test3}]
envs_list = [{alpha}, {bravo}, {charlie}, {delta}, {echo}]

变体1:在这种情况下

"""This loop create tasks for each env"""
for env_item in envs_list:
    for test_item in tests_set_list:
        r_key = 'tests_run.bunch_test_execute_task.{0}.{1}'.format(test_item['test_folder_name'],
                                                                   env_item['env_codename'])
        bunch_test_execute_task.apply_async(kwargs={'env_item': env_item,
                                                    'test_item': [test_item],
                                                    'tkn_branch': branch,
                                                    },
                                            queue='tests_run',
                                            routing_key=r_key)

变体2: -> typeError:'asyncresult'对象无法订阅在这种情况下,任务仅添加第一次迭代,然后停止。

""" This loop create env inst for each task. Works in group
    Works bad, did not add task in worker queue. 
"""
list_of_groups = []
for test_item in tests_set_list:
    tasks_group_by_env = []
    for env_item in synced_envs:
        r_key = 'tests_run.bunch_test_execute_env_q_task.{0}.{1}'.format(test_item['test_folder_name'],
                                                                          env_item['env_codename'])
        task = bunch_test_execute_env_q_task(env_item, [test_item], branch)
        tasks_group_by_env.append(task)
    groups = group(tasks_group_by_env)
    list_of_groups.append(groups)
for group_item in list_of_groups:
    # job = group(*group_item)
    job = group_item
    job.apply_async(queue='tests_run')

变体3: -> typeError:不可订购类型:float()> = str()但是无论如何,任务是在队列中添加的,但是以重叠的方式执行。

""" This loop use one test item and iter envs from list of synced envs to add groups """
list_of_groups = []
for test_item in tests_set_list:
    r_key = 'tests_run.bunch_test_execute_env_q_task.{0}.{1}'
    lazy_group = group(bunch_test_execute_env_q_task.apply_async(
          args=[env_item['env_codename']],
          kwargs={'env_item': env_item,
                  'test_item': [test_item],
                  'tkn_branch': branch,
                  },
          queue='tests_run',
          routing_key=r_key.format(test_item['pattern_folder_name'],
                                   env_item['env_codename']))for env_item in envs_list)
    # list_of_tasks.append(task)
    list_of_groups.append(lazy_group)
all_groups = group(*list_of_groups)
all_groups()

我想实现的摘要:每个测试将获得单独的测试env。为此,这是一个不同的工作:

job_1 = alpha(test1)
job_2 = bravo(test1)
job_3 = charlie(test1)
job_4 = delta(test1)
job_5 = echo(test1)
...
job_n+1 = alpha(test_n+1)
job_n+1 = bravo(test_n+1)
job_n+1 = charlie(test_n+1)
job_n+1 = delta(test_n+1)
job_n+1 = echo(test_n+1)

这似乎可以简单地通过为每个测试env添加单独的工作者来实现,因此它们永远不会彼此重叠,因为在单独的流程工作人员中运行,但是它并不优雅,因为环境可以不时改变。

简单的答案是使用队列:

PY上的样子: - 这只是为现有工人添加队列。如何增加工人 - 请参阅芹菜妖魔文档

app.control.add_consumer(
    queue='alpha',
    exchange = 'tests_run',
    exchange_type = 'direct',
    routing_key   = 'alpha.*',
    destination = ['alpha@tentacle'])

然后,当您要同时执行5个不同的任务而没有任何重叠(例如,使用5个diff envs)时,只需添加任务:

queue =" desipable_queue"

TestExecTasks().your_task.apply_async(
    args=[arg1,arg2],
    kwargs=dict(test=test_set, env=some_env),
    queue=queue, routing_key=r_key,
    soft_time_limit=HOURS_2, task_time_limit=HOURS_2+500)

这是什么意思?您可以有代表每个工作环境的工人:

  • Alpha,Beta,Charlie,Delta,Echo

因此,您在循环中执行一个任务,每个任务都与您想要的env一起执行。所有任务都会在工人身上运行,因为工人有独特的队列附带。因此,您需要有5名工人 5个队列。这是一个技巧。当然,您甚至可以在每个任务过程中都包含线程并进行更多。

相关内容

  • 没有找到相关文章

最新更新