我正在使用kedro.extras.datasets.pandas.SQLTableDataSet,并希望使用pandas中的chunk_size参数。然而,当运行管道时,表被视为生成器,而不是pd.dataframe((.
您将如何在管道中使用chunk_size?
我的目录:
table_name:
type: pandas.SQLTableDataSet
credentials: redshift
table_name : rs_table_name
layer: output
save_args:
if_exists: append
schema: schema.name
chunk_size: 1000
查看最新的pandas
文档,实际要使用的kwarg
是chunksize
,而不是chunk_size
。请参阅https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html.由于kedro
只包装save_args
并将其传递给pd.DataFrame.to_sql
,因此需要匹配:
def _save(self, data: pd.DataFrame) -> None:
try:
data.to_sql(**self._save_args)
except ImportError as import_error:
raise _get_missing_module_error(import_error) from import_error
except NoSuchModuleError as exc:
raise _get_sql_alchemy_missing_error() from exc
编辑:一旦您在管道中完成了这项工作,文档就会显示设置了chunksize
的pandas.DataFrame.read_sql
将返回类型Iterator[DataFrame]
。这意味着在您的节点函数中,您应该对输入进行迭代(并在适当的情况下进行相应的注释(,例如:
def my_node_func(input_dfs: Iterator[pd.DataFrame], *args):
for df in input_dfs:
...
这适用于最新版本的pandas
。然而,我注意到pandas
正在调整API,因此read_csv
和chunksize
集合从pandas>=1.2
返回ContextManager
,因此我预计read_sql
中也会发生这种变化。