如何获取当前管道项目的 URI



考虑以下管道:

example_gen = tfx.components.ImportExampleGen(input_base=_dataset_folder)
statistics_gen = tfx.components.StatisticsGen(examples=example_gen.outputs['examples'])
schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=True)
transform = tfx.components.Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath('preprocessing_fn.py'))
_trainer_module_file = 'run_fn.py'
trainer = tfx.components.Trainer(
module_file=os.path.abspath(_trainer_module_file),
examples=transform.outputs['transformed_examples'],
transform_graph=transform.outputs['transform_graph'],
schema=schema_gen.outputs['schema'],
train_args=tfx.proto.TrainArgs(num_steps=10),
eval_args=tfx.proto.EvalArgs(num_steps=6),)

pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=_serving_model_dir)
)
)
components = [
example_gen,
statistics_gen,
schema_gen,
transform,
trainer,
pusher,
]
_pipeline_data_folder = './simple_pipeline_data'
pipeline = tfx.dsl.Pipeline(
pipeline_name='simple_pipeline',
pipeline_root=_pipeline_data_folder,
metadata_connection_config=tfx.orchestration.metadata.sqlite_metadata_connection_config(
f'{_pipeline_data_folder}/metadata.db'),
components=components)
tfx.orchestration.LocalDagRunner().run(pipeline)

现在,让我们假设一旦管道关闭,我想对工件做一些事情。我知道我可以像这样查询ML元数据:

import ml_metadata as mlmd
connection_config = pipeline.metadata_connection_config
store = mlmd.MetadataStore(connection_config)
print(store.get_artifact_types())

但是这样,我不知道哪些id属于当前管道。当然,我可以假设最大的id表示当前的管道构件,但在生产环境中,当多个执行可能试图并发地使用相同的元数据存储时,这不是一种实用的方法。

那么,问题是我如何才能计算出刚刚由当前执行创建的工件id ?

(更新)

为了澄清问题,考虑以下部分解决方案:

def get_latest_artifact(metadata_connection_config, pipeline_name: str, component_name: str, type_name: str):
with Metadata(metadata_connection_config) as metadata:
context = metadata.store.get_context_by_type_and_name('node', f'{pipeline_name}.{component_name}')
artifacts = metadata.store.get_artifacts_by_context(context.id)
artifact_type = metadata.store.get_artifact_type(type_name)
latest_artifact = max([a for a in artifacts if a.type_id == artifact_type.id],
key=lambda a: a.last_update_time_since_epoch)
artifact = types.Artifact(artifact_type)
artifact.set_mlmd_artifact(latest_artifact)
return artifact
sqlite_path = './pipeline_data/metadata.db'
metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(sqlite_path)
examples_artifact = get_latest_artifact(metadata_connection_config, 'simple_pipeline',
'SchemaGen', 'Schema')

使用get_latest_artifact函数,我可以从特定管道获得特定类型的最新工件。即使两个管道(具有不同的名称)并发地创建新的工件,这也可以工作。但是当我试图提取"刚刚完成"的工件时,它就会失败。如果同一管道的多个实例并发地对存储进行更改,则使用。这是因为该函数接受管道名称作为输入参数(而不是管道的唯一ID)。

我正在寻找一个解决方案,无论有多少不同的(或相同的)管道与同一存储并发工作。在这一点上,我不确定是否可以做到这一点与MlMD。如果现在不能完成,我认为这是一个缺失的功能,一个非常重要的功能。

好的,这是我找到的解决方案。在定义管道的组件时,应该使用.with_id()方法并为组件提供一个自定义ID。这样你以后就可以找到它了。

这里有一个例子。假设我想要查找作为最近执行的管道的一部分生成的模式。

schema_gen = tfx.components.SchemaGen(
statistics=statistics_gen.outputs['statistics'],
infer_feature_shape=True).with_id('some_unique_id')
然后,我上面定义的函数可以这样使用:
def get_latest_artifact(metadata_connection_config, pipeline_name: str, component_name: str, type_name: str):
with Metadata(metadata_connection_config) as metadata:
context = metadata.store.get_context_by_type_and_name('node', f'{pipeline_name}.{component_name}')
artifacts = metadata.store.get_artifacts_by_context(context.id)
artifact_type = metadata.store.get_artifact_type(type_name)
latest_artifact = max([a for a in artifacts if a.type_id == artifact_type.id],
key=lambda a: a.last_update_time_since_epoch)
artifact = types.Artifact(artifact_type)
artifact.set_mlmd_artifact(latest_artifact)
return artifact
sqlite_path = './pipeline_data/metadata.db'
metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(sqlite_path)
examples_artifact = get_latest_artifact(metadata_connection_config, 'simple_pipeline',
'some_unique_id', 'Schema')

最新更新