我在S3存储桶中有多个文本文件,我读取并处理这些文件。因此,我在Kedro数据目录中定义了PartitionedDataSet,如下所示:
raw_data:
type: PartitionedDataSet
path: s3://reads/raw
dataset: pandas.CSVDataSet
load_args:
sep: "t"
comment: "#"
此外,我实现了这个解决方案,通过包括AWS密钥在内的环境变量从凭证文件中获取所有机密。
当我使用kedro run
在本地运行时,一切都很好,但当我构建Docker镜像(使用kedro Docker(并使用kedro docker run
在Docker环境中运行管道时,通过使用--docker-args
选项提供所有环境变量,我会得到以下错误:
Traceback (most recent call last):
File "/usr/local/bin/kedro", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.7/site-packages/kedro/framework/cli/cli.py", line 724, in main
cli_collection()
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.7/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/kedro/kedro_cli.py", line 230, in run
pipeline_name=pipeline,
File "/usr/local/lib/python3.7/site-packages/kedro/framework/context/context.py", line 767, in run
raise exc
File "/usr/local/lib/python3.7/site-packages/kedro/framework/context/context.py", line 759, in run
run_result = runner.run(filtered_pipeline, catalog, run_id)
File "/usr/local/lib/python3.7/site-packages/kedro/runner/runner.py", line 101, in run
self._run(pipeline, catalog, run_id)
File "/usr/local/lib/python3.7/site-packages/kedro/runner/sequential_runner.py", line 90, in _run
run_node(node, catalog, self._is_async, run_id)
File "/usr/local/lib/python3.7/site-packages/kedro/runner/runner.py", line 213, in run_node
node = _run_node_sequential(node, catalog, run_id)
File "/usr/local/lib/python3.7/site-packages/kedro/runner/runner.py", line 221, in _run_node_sequential
inputs = {name: catalog.load(name) for name in node.inputs}
File "/usr/local/lib/python3.7/site-packages/kedro/runner/runner.py", line 221, in <dictcomp>
inputs = {name: catalog.load(name) for name in node.inputs}
File "/usr/local/lib/python3.7/site-packages/kedro/io/data_catalog.py", line 392, in load
result = func()
File "/usr/local/lib/python3.7/site-packages/kedro/io/core.py", line 213, in load
return self._load()
File "/usr/local/lib/python3.7/site-packages/kedro/io/partitioned_data_set.py", line 240, in _load
raise DataSetError("No partitions found in `{}`".format(self._path))
kedro.io.core.DataSetError: No partitions found in `s3://reads/raw`
注意:Pipeline在Docker环境中运行良好,如果我将文件移动到某个本地目录,定义PartitionedDataSet并构建Docker映像,并通过--docker-args
提供环境变量
解决方案(至少在我的情况下(是在kedro docker run
命令中提供AWS_DEFAULT_REGION
env变量。