我创建了一个胶合作业,将S3(CSV文件)的数据复制到RedShift。它可以正常工作并填充所需的表。
但是,在此过程中,我需要在此过程中清除表格。
我正在寻找一种将此清除添加到胶水过程中的方法。任何建议将不胜感激。
谢谢。
您可以更改胶水脚本以执行" preaction"如下所述,插入之前:
https://aws.amazon.com/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame
= datasource0, catalog_connection = "test_red", connection_options = {"preactions":"truncate table target_table;","dbtable": "target_table", "database": "redshiftdb"}, redshift_tmp_dir = 's3://s3path', transformation_ctx = "datasink4")
例如,对于我的脚本,它主要是基于默认值,我在最后一个数据链接之前插入了一个新的数据链接(我已经用{themings}}替换了我的一些deatils):
## @type: DataSink
## @args: [catalog_connection = "redshift-data-live", connection_options = {"dbtable": "{DBTABLE}", "database": "{DBNAME}"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink4
## @inputs: [frame = dropnullfields3]
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "redshift-data-live", connection_options = {"preactions":"truncate table {TABLENAME};","dbtable": "{SCHEMA.TABLENAME}", "database": "{DB}"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
## @type: DataSink
## @args: [catalog_connection = "redshift-data-live", connection_options = {"dbtable": "{SCHEMA.TABLENAME}", "database": "{DB}"}, redshift_tmp_dir = TempDir, transformation_ctx = "datasink4"]
## @return: datasink5
## @inputs: [frame = datasink4]
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasink4, catalog_connection = "redshift-data-live", connection_options = {"dbtable": "{SCHEMA.TABLENAME}", "database": "{DB}"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink5")
job.commit()
链接@frobinrobin提供的链接已过时,我尝试了很多次,即使您提供了错误的语法,也将跳过预反应语句,并带有重复的行(插入操作确实确实如此)执行!)
尝试以下操作:
只需替换语法 上面链接到glueContext.write_dynamic_frame_from_jdbc_conf()
的glueContext.write_dynamic_frame.from_jdbc_conf()
将有效!
至少在我的情况下,这对我有帮助(AWS胶合作业只需插入红移而不执行截断表操作)
您是否看过胶水书签?这是保持高水位并仅与S3一起使用的功能。我不是100%确定的,但可能需要分区才能到位。
您需要修改胶水提供的自动生成的代码。使用SPARK JDBC连接连接到RedShift并执行清除查询。
在红移VPC中旋转胶水容器;指定胶合作业中的连接,以获取红移集群的访问。
希望这会有所帮助。
您可以使用SPARK/PYSPARK DATABRICKS库在表的截断表之后进行附加(这比覆盖层更好):
preactions = "TRUNCATE table <schema.table>"
df.write
.format("com.databricks.spark.redshift")
.option("url", redshift_url)
.option("dbtable", redshift_table)
.option("user", user)
.option("password", readshift_password)
.option("aws_iam_role", redshift_copy_role)
.option("tempdir", args["TempDir"])
.option("preactions", preactions)
.mode("append")
.save()
您可以在此处查看Databricks文档