我创建了一个测试管道,但中途失败了。我想以编程方式重新执行它,但从管道的失败步骤开始,然后继续前进。我不想重复以前成功的步骤。
from dagster import DagsterInstance, execute_pipeline, pipeline, solid, reexecute_pipeline
from random import random
instance = DagsterInstance.ephemeral()
@solid
def step1(context, data):
return range(10), ('a' + i for i in range(10))
@solid
def step2(context, step1op):
x,y = step1op
# simulation of noise
xx = [el * (1 + 0.1 * random()) for el in x]
xx2 = [(el - 1)/el for el in xx]
return zip(xx, xx2), y
@solid
def step3(context, step2op):
x, y = step2op
...
return x, y
run_config = {...}
@pipeline
def inputs_pipeline():
step3(step2(step1()))
部分管道的程序化重新执行需要识别可用的父实体的ID:
parent_run_id = instance.get_runs()[0].run_id
然后重新布线管道:
result = reexecute_pipeline(inputs_pipeline, parent_run_id=parent_run_id,
step_keys_to_execute=['step2.compute', 'step3.compute'],
run_config=run_config, instance=instance)