我对Dagster很陌生,在文档中找不到问题的答案。
我有 2 个实体:一个产生从 XML 文件解析的元组(str,str(,另一个只使用元组并将对象存储在数据库中,并根据字段集。但是我遇到了错误Core compute for solid returned an output multiple times
.我很确定我在设计中犯了根本性错误。有人可以向我解释如何以正确的方式设计此管道或指出文档中解释此错误的章节吗?
@solid(output_defs=[OutputDefinition(Tuple, 'classification_data')])
def extract_classification_from_file(context, xml_path: String) -> Tuple:
context.log.info(f"start")
root = ET.parse(xml_path).getroot()
for code_node in root.findall('definition-item'):
context.log.info(f"{code_node.find('classification-symbol').text} {code_node.find('definition-title').text}")
yield Output((code_node.find('classification-symbol').text, code_node.find('definition-title').text), 'classification_data')
@solid()
def load_classification(context, classification_data):
cls = CPCClassification.objects.create(code=classification_data[0], description=classification_data[1]).save()
@pipeline
def define_classification_pipeline():
load_classification(extract_classification_from_file())
在查看了我在这里找到的dagger代码库以查找您的错误之后。 它证实了我在教程中读到的"输出名称必须是唯一的"。
假设您在 for 循环中声明 Output 并且收到错误,则 Output 对象名称可能不是唯一的。
更新: 从您通过打开问题向 dagster 进行的宣传中,我测试了在运行时动态创建输出的想法,如果您在@solid
之外定义动态代码,它可以正常工作。 我确实发现,当尝试在@solid
中构建动态数据并打算将其输出用作后续@solid
的可靠配置输入时,后续@solid
没有选择更新的结构。 结果是我收到了dagster.core.errors.DagsterInvariantViolationError
下面是我的代码,用于验证在实体外部执行动态数据生成时运行时产生的动态输出。我猜这可能有点反模式,但如果 Dagster 还没有达到成熟度水平来处理你提出的场景,可能还没有。另请注意,我没有处理的是对所有生成的 Output 对象执行某些操作。
"""dagit -f dynamic_output_at_runtime.py -n dynamic_output_at_runtime"""
import random
from dagster import (
Output,
OutputDefinition,
execute_pipeline,
pipeline,
solid,
SystemComputeExecutionContext
)
# Create some dynamic OutputDefinition list for each execution
start = 1
stop = 100
limit = random.randint(1, 10)
random_set_of_ints = {random.randint(start, stop) for iter in range(limit)}
output_defs_runtime = [OutputDefinition(
name=f'output_{num}') for num in random_set_of_ints]
@solid(output_defs=output_defs_runtime)
def ints_for_all(context: SystemComputeExecutionContext):
for num in random_set_of_ints:
out_name = f"output_{num}"
context.log.info(f"output object name: {out_name}")
yield Output(num, out_name)
@pipeline
def dynamic_output_at_runtime():
x = ints_for_all()
print(x)
if __name__ == '__main__':
result = execute_pipeline(dynamic_output_at_runtime)
assert result.success
我重新运行此管道的结果是每次的输出产量不同:
python dynamic_output_at_runtime.py
_ints_for_all_outputs(output_56=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea160>, output_8=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea198>, output_58=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea1d0>, output_35=<dagster.core.definitions.composition.InvokedSolidOutputHandle object at 0x7fb899cea208>)
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - PIPELINE_START - Started execution of pipeline "dynamic_output_at_runtime".
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - ENGINE_EVENT - Executing steps in process (pid: 9456)
event_specific_data = {"metadata_entries": [["pid", null, ["9456"]], ["step_keys", null, ["{'ints_for_all.compute'}"]]]}
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_START - Started execution of step "ints_for_all.compute".
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_56
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_56" of type "Any". (Type check passed).
event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_56"], "type_check_data": [true, "output_56", null, []]}
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_8
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_8" of type "Any". (Type check passed).
event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_8"], "type_check_data": [true, "output_8", null, []]}
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_58
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_58" of type "Any". (Type check passed).
event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_58"], "type_check_data": [true, "output_58", null, []]}
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - INFO - system - a1273816-16b0-439b-ae32-dbd819f65b9a - output object name: output_35
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_OUTPUT - Yielded output "output_35" of type "Any". (Type check passed).
event_specific_data = {"intermediate_materialization": null, "step_output_handle": ["ints_for_all.compute", "output_35"], "type_check_data": [true, "output_35", null, []]}
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - STEP_SUCCESS - Finished execution of step "ints_for_all.compute" in 2.17ms.
event_specific_data = {"duration_ms": 2.166192003642209}
solid = "ints_for_all"
solid_definition = "ints_for_all"
step_key = "ints_for_all.compute"
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - ENGINE_EVENT - Finished steps in process (pid: 9456) in 3.11ms
event_specific_data = {"metadata_entries": [["pid", null, ["9456"]], ["step_keys", null, ["{'ints_for_all.compute'}"]]]}
2019-11-27 08:33:32 - dagster - DEBUG - dynamic_output_at_runtime - a1273816-16b0-439b-ae32-dbd819f65b9a - PIPELINE_SUCCESS - Finished execution of pipeline "dynamic_output_at_runtime".
我希望这有帮助!
我在这里所做的是不支持的行为。正式声明问题