如何在kedro管道中使用kedro.extras.datasets.pandas.SQLTableDataSet的Ch



我正在使用kedro.extras.datasets.pandas.SQLTableDataSet,并希望使用pandas中的chunk_size参数。然而,当运行管道时,表被视为生成器,而不是pd.dataframe((.

您将如何在管道中使用chunk_size?

我的目录:

table_name:
type: pandas.SQLTableDataSet
credentials: redshift
table_name : rs_table_name
layer: output
save_args:
if_exists: append
schema: schema.name
chunk_size: 1000

查看最新的pandas文档,实际要使用的kwargchunksize,而不是chunk_size。请参阅https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html.由于kedro只包装save_args并将其传递给pd.DataFrame.to_sql,因此需要匹配:

def _save(self, data: pd.DataFrame) -> None:
try:
data.to_sql(**self._save_args)
except ImportError as import_error:
raise _get_missing_module_error(import_error) from import_error
except NoSuchModuleError as exc:
raise _get_sql_alchemy_missing_error() from exc

编辑:一旦您在管道中完成了这项工作,文档就会显示设置了chunksizepandas.DataFrame.read_sql将返回类型Iterator[DataFrame]。这意味着在您的节点函数中,您应该对输入进行迭代(并在适当的情况下进行相应的注释(,例如:

def my_node_func(input_dfs: Iterator[pd.DataFrame], *args):
for df in input_dfs:
...

这适用于最新版本的pandas。然而,我注意到pandas正在调整API,因此read_csvchunksize集合从pandas>=1.2返回ContextManager,因此我预计read_sql中也会发生这种变化。

相关内容

  • 没有找到相关文章

最新更新