看过DataEngineerOne的视频:如何使用参数范围自动生成管道我想自动化模拟电子电路的管道。我想在带通滤波器的多个中心频率上进行网格搜索,并为每个频率运行simulate
管道。
在管道注册表中,网格搜索参数被传递给create_pipeline()
函数的kwargs
。
# pipeline_registry.py
"""Project pipelines."""
from typing import Dict
from kedro.pipeline import Pipeline, pipeline
from kedro.config import ConfigLoader
from my_project.pipelines import grid_search as grd
def register_pipelines() -> Dict[str, Pipeline]:
"""Register the project's pipelines.
Returns:
A mapping from a pipeline name to a ``Pipeline`` object.
"""
conf_path = r'C:Usersuserpython_projectsmy_projectconf'
conf_loader = ConfigLoader(conf_source=conf_path, env="local")
gs_params = conf_loader.get('**/grid_search.yml')
gridsearch_pipeline = grd.create_pipeline(**gs_params['grid_search'])
return {
"__default__":gridsearch_pipeline,
"grd" : gridsearch_pipeline,
}
在管道的pipeline.py
文件中,我在网格搜索参数上循环,并在每个循环中创建一个具有自己名称空间的新管道。
# grid_searchpipeline.py
from kedro.pipeline import Pipeline, node, pipeline
from my_project.pipelines.grid_search.nodes import grid_search
from numpy import arange
def create_pipeline(**kwargs) -> Pipeline:
fmin = kwargs['f_central_min']
fmax = kwargs['f_central_max']
fstp = kwargs['f_central_step']
cfs = arange(fmin, fmax, fstp) # central frequencies
for cf in cfs:
def generate_freq_node(cf):
def generated():
return cf
return generated
pipeline_key = f'pipeline_{cf}'
pipe_gridsearch = Pipeline([
node(
func=generate_freq_node(cf),
inputs=None,
outputs=pipeline_key,
),
node(
func=grid_search,
inputs=["msmts_table", "params:grid_search"],
outputs=None,
name="node_grid_search",
),
])
pipe_complete = pipeline(
pipe=pipe_gridsearch,
inputs={"msmts_table": "msmts_table"},
parameters={"params:grid_search": pipeline_key},
namespace=pipeline_key,
)
return pipe_complete
在grid_search.yml
文件中:
grid_search:
f_central_min : 100000
f_central_max : 500000
f_central_step: 100000
最后是grid_searchnodes.py
:
from typing import Dict, Any
from pandas import DataFrame
from numpy import arange
def grid_search(msmts_table: DataFrame,
grd_params: Dict[str, Any],
) -> DataFrame:
"""Repeatedly run the simulation pipeline for all the parameters passed in params."""
print(grd_params)
当我现在kedro run --pipeline grd
时,我得到以下错误:
ValueError: Pipeline input(s) {'params:pipeline_400000'} not found in the DataCatalog
这实际上是我所期望的,因为catalog.yml
中没有指定数据集。然而,在上面提到的视频中,这种方法是有效的,DataEngineerOne说参数pipeline_key
被保存为memroy数据集。这是一个从未有过的kedro版本中的改变吗,还是我在这里错过了什么?我是否必须在目录中将所有参数指定为单独的数据集?这会以某种方式破坏这种自动化的全部意义。。。
在此期间,您可能已经找到了解决方案,否则我建议您在pipeline.py
代码的最后一部分尝试以下修改:
-
pipe_complete += pipeline(
而不是pipe_complete = pipeline(
在循环中添加管道 -
parameters={"grid_search": pipeline_key},
而不是parameters={"params:grid_search": pipeline_key},