我是新来的dagster,我有一个困难的时间来整理这个。我必须在我的dagster管道中定义作业,并且我想将数据从一个作业中的op传递到另一个作业中的op
我的设置是这样的(简化的例子)
job1.py
@op()
def generate_num():
return 3
@op()
def increase_num(generate_num):
return generate_num + 1
@job()
def increment_up():
increase_num(generate_num))
job2.py
@op()
def decrease_num(generate_num)
generate_num - 1
@op()
def multiple_num(decrease_num)
decrease_num * 2
@job()
def get_multiple():
multiple_num(decrease_num())
从"generate_num"返回的值传递给job2.py。这样做是不是太离谱了?
有什么原因不能在第二个作业中重用generate_num
吗?像
from job1 import generate_num
@op()
def decrease_num(generate_num)
generate_num - 1
@op()
def multiple_num(decrease_num)
decrease_num * 2
@job()
def get_multiple():
multiple_num(decrease_num(generate_num()))
在《Dagster》中考虑这一点的典型方法是使用资产。即generate_num
在第一个作业中产生并在第二个作业中使用的值将驻留在任何作业运行范围之外的持久存储中。资产是持久存储的对象,如文件或表。
下面是两个作业共享一个资产的例子:
from dagster import Definitions, AssetSelection, asset, define_asset_job
@asset
def num():
return 3
@asset
def num_plus_one(num):
return num + 1
@asset
def num_multiplied(num):
return num + 2
defs = Definitions(
assets=[num, num_plus_one, num_multiplied],
jobs=[
define_asset_job("inc_job", AssetSelection.assets(num, num_plus_one)),
define_asset_job("multi_job", AssetSelection.assets(num_multiplied)),
],
)
运行第一个作业时,将为num
和num_plus_one
创建一个文件。当您运行第二个作业时,它将使用num
文件的内容来计算num_multiplied
。