如何在 Kedro 节点中使用 SQL Server 批量插入?



我正在使用Kedro管理数据管道,在最后一步中,我将一个巨大的csv文件存储在S3存储桶中,我需要将其加载回SQL Server。

我通常会使用批量插入来解决这个问题,但不太确定如何将其放入kedro模板中。这是目标表和 S3 存储桶,如catalog.yml

flp_test:
type: pandas.SQLTableDataSet
credentials: dw_dev_credentials
table_name: flp_tst
load_args:
schema: 'dwschema'
save_args:
schema: 'dwschema'
if_exists: 'replace'
bulk_insert_input:
type: pandas.CSVDataSet
filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
credentials: dev_s3

def insert_data(self, conn, csv_file_nm, db_table_nm):
qry = "BULK INSERT " + db_table_nm + " FROM '" + csv_file_nm + "' WITH (FORMAT = 'CSV')"
# Execute the query
cursor = conn.cursor()
success = cursor.execute(qry)
conn.commit()
cursor.close
  • 如何将csv_file_nm指向我的bulk_insert_inputS3 目录?
  • 有没有一种正确的方法可以间接访问dw_dev_credentials进行插入?

凯德罗的熊猫。SQLTableDataSet.html按原样使用 pandas.to_sql 方法。要按原样使用它,您需要将一个pandas.CSVDataSet到一个node中,然后写入目标pandas.SQLDataTable数据集,以便将其写入 SQL。如果你有Spark可用,这将比Pandas更快。

为了使用内置的BULK INSERT查询,我认为您需要定义一个自定义数据集。

相关内容

最新更新