执行Glue ETL中的存储过程



我们如何执行SQL语句(如…'调用store_proc();')在Redshift通过PySpark Glue ETL作业利用目录连接?我想从Glue Catalog connection传递Redshift连接详细信息(主机,用户,密码)。

我理解'write_dynamic_frame'选项,但我不确定如何仅对Redshift服务器执行SQL语句。

glueContext.write_dynamic_frame.from_jdbc_conf (frame=data_frame, catalog_connection="Redshift_Catalog_Conn", connection_options = {"preactions":"call stored_prod();","dbtable":"public.table1","database": "admin"}, redshift_tmp_dir="s3://glue_etl/")

据我所知,您想从Glue ETL作业中调用RedShift中的存储过程。一种方法如下:在Redshift中执行存储过程的一种更简单的方法如下:

post_query="begin; CALL sp_procedure1(); end;" 
datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = mydf, 
catalog_connection = "redshift_connection", 
connection_options = {"dbtable": "my_table", "database": "dev","postactions":post_query}, 
redshift_tmp_dir = 's3://tempb/temp/' transformation_ctx = "datasink")

另一个更复杂的解决方案是在应用程序代码中运行SQL查询。

  1. 通过Glue连接建立到RedShift集群的连接。使用JDBC选项在Glue中创建动态框架
my_conn_options = {  
"url": "jdbc:redshift://host:port/redshift-database-name",
"dbtable": "redshift-table-name",
"user": "username",
"password": "password",
"redshiftTmpDir": args["TempDir"],
"aws_iam_role": "arn:aws:iam::account id:role/role-name"
}
df = glueContext.create_dynamic_frame_from_options("redshift", my_conn_options)
  1. 为了执行存储过程,我们将使用Spark SQL。所以首先将Glue动态框架转换为Spark DF。
spark_df=df.toDF()
spark_df.createOrReplaceTempView("CUSTOM_TABLE_NAME")
spark.sql('call store_proc();')

你在RedShift中的存储过程应该有返回值,这些返回值可以写入变量。

相关内容

  • 没有找到相关文章

最新更新