Dagster 在另一个实体中使用一个实体的资产具体化(包括可重现的代码)



假设我有两个实体。第一个执行一些计算并将文件写入磁盘。第二个实体获取该文件并对其执行其他操作,但它需要其文件系统路径才能打开它。我可以使用两个yield(一个用于AssetMaterialization,另一个用于strOutput)并明确地将Output放在第二个可靠的调用中:

from dagster import (AssetKey, AssetMaterialization, EventMetadataEntry,
Output, execute_pipeline, pipeline, solid)
@solid
def yield_asset(context):
yield AssetMaterialization(
asset_key=AssetKey('my_dataset'),
description='Persisted result to storage',
metadata_entries=[
EventMetadataEntry.text('Text-based metadata for this event',
label='text_metadata'),
EventMetadataEntry.fspath('/path/to/data/on/filesystem'),
EventMetadataEntry.url('http://mycoolsite.com/url_for_my_data',
label='dashboard_url'),
],
)
yield Output('/path/to/data/on/filesystem')

@solid
def print_asset_path(context, asset_path: str):
# do stuff with `asset_path`
context.log.info(asset_path)

@pipeline
def some_pipeline():
asset_path = yield_asset()
print_asset_path(asset_path)

if __name__ == "__main__":
result = execute_pipeline(some_pipeline)

这工作正常,您应该在日志(2021-03-16 13:23:29 - dagster - INFO - system - 366248ec-6a83-462f-b62f-9fb2514f6f80 - print_asset_path - /path/to/data/on/filesystem)和dagit中的AssetMaterialization中收到信息消息。

但是,这有点不方便,因为我需要使用所需的文件系统路径显式生成Output。是否可以以及如何引用第二个实体中的AssetMaterialization,并直接使用其属性?

类似的东西(不起作用):

@solid
def print_asset_path(context):
asset_path = context.assets.get_asset_by_key(`my_key`).fspath
# do stuff with `asset_path`
context.log.info(asset_path)

您提供的代码是目前在 Dagster 中完成此操作的最佳方式。

如果在实体本身执行之前已知 fspath,那么这两个问题(尚未实现)中概述的方向可能会提供更优雅的解决方案:

  • https://github.com/dagster-io/dagster/issues/3894
  • https://github.com/dagster-io/dagster/issues/3895

最新更新