如何创建目录条目列表并将其作为输入传入Kedro Pipeline

  • 本文关键字:Pipeline Kedro 创建目录 列表 kedro
  • 更新时间 :
  • 英文 :


我正试图从我创建的目录文件中获取数据集列表,并将它们作为单个节点的输入传入,以组合它们,并最终使用kedro气流插件在气流上运行管道

这适用于kedro运行的cli,但似乎在气流中失败,我不确定原因:

#my_pipeline/pipeline.py
def create_pipeline(**kwargs):
conf_loader = ConfigLoader(['conf/base'])
conf_catalog = conf_loader.get('catalog-a*')
datasets = [key for key, value in conf_catalog.items()] 
return Pipeline([
node(
func=combine_data,
inputs=datasets,
outputs="combined_data",
name="combined_data"
),
...#other nodes
])

我在气流上遇到的错误如下所示:断开dag:给定的配置路径不存在或不是有效目录:"conf/base">

这当然是一个Kedro配置加载程序错误,但我似乎不明白为什么只有在通过气流运行管道时才会出现错误。从我读到的代码混合API是不建议的。在数据集列表中,这是正确的方式吗?

编辑

我的目录基本上是Sql查询数据集的列表:

dataset_1:
type: pandas.SQLQueryDataSet
sql: select * from my_table where created_at >= '2018-12-21 16:00:00' and partner_id=1
credentials: staging_sql
dataset_2:
type: pandas.SQLQueryDataSet
sql: select * from my_table where created_at >= '2019-08-15 11:55:00' and partner_id=2
credentials: staging_sql

我认为它可能会失败,因为kedro run是从它的根目录运行的,在那里它可以找到conf/base,但create_pipeline函数在my_pipeline目录下,所以kedro ConfigLoader找不到。我想我过去做这件事的另一种方法是,像这样通过catalog: DataCatalog

def create_pipeline(catalog: DataCatalog = None, * *kwargs) -> Pipeline:

然后你可以迭代或做:

datasets = catalog.datasets

相关内容

  • 没有找到相关文章