我们如何执行SQL语句(如…'调用store_proc();')在Redshift通过PySpark Glue ETL作业利用目录连接?我想从Glue Catalog connection传递Redshift连接详细信息(主机,用户,密码)。
我理解'write_dynamic_frame'选项,但我不确定如何仅对Redshift服务器执行SQL语句。
glueContext.write_dynamic_frame.from_jdbc_conf (frame=data_frame, catalog_connection="Redshift_Catalog_Conn", connection_options = {"preactions":"call stored_prod();","dbtable":"public.table1","database": "admin"}, redshift_tmp_dir="s3://glue_etl/")
据我所知,您想从Glue ETL作业中调用RedShift中的存储过程。一种方法如下:在Redshift中执行存储过程的一种更简单的方法如下:
post_query="begin; CALL sp_procedure1(); end;"
datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = mydf,
catalog_connection = "redshift_connection",
connection_options = {"dbtable": "my_table", "database": "dev","postactions":post_query},
redshift_tmp_dir = 's3://tempb/temp/' transformation_ctx = "datasink")
另一个更复杂的解决方案是在应用程序代码中运行SQL查询。
- 通过Glue连接建立到RedShift集群的连接。使用JDBC选项在Glue中创建动态框架
my_conn_options = { "url": "jdbc:redshift://host:port/redshift-database-name", "dbtable": "redshift-table-name", "user": "username", "password": "password", "redshiftTmpDir": args["TempDir"], "aws_iam_role": "arn:aws:iam::account id:role/role-name" } df = glueContext.create_dynamic_frame_from_options("redshift", my_conn_options)
- 为了执行存储过程,我们将使用Spark SQL。所以首先将Glue动态框架转换为Spark DF。
spark_df=df.toDF()
spark_df.createOrReplaceTempView("CUSTOM_TABLE_NAME")
spark.sql('call store_proc();')
你在RedShift中的存储过程应该有返回值,这些返回值可以写入变量。