我有一个自己编写的 Glue 脚本和一个存储在 Glue 目录中的 JDBC 连接。我不知道如何使用 PySpark 从存储在 RDS 中的 MySQL 数据库中执行 select 语句,我的 JDBC 连接指向该数据库。我还使用粘附爬虫来推断我有兴趣查询的 RDS 表的架构。如何使用 WHERE 子句查询 RDS 数据库?
我已经浏览了DynamicFrameReader和GlueContext类的文档,但似乎都没有指出我正在寻找的方向。
这取决于你想做什么。例如,如果要执行select * from table where <conditions>
,则有两个选项:
假设您创建了一个爬网程序,并在 AWS Glue 作业上插入了源,如下所示:
# Read data from database
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "db", table_name = "students", redshift_tmp_dir = args["TempDir"])
- AWS Glue
# Select the needed fields
selectfields1 = SelectFields.apply(frame = datasource0, paths = ["user_id", "full_name", "is_active", "org_id", "org_name", "institution_id", "department_id"], transformation_ctx = "selectfields1")
filter2 = Filter.apply(frame = selectfields1, f = lambda x: x["org_id"] in org_ids, transformation_ctx="filter2")
- PySpark + AWS Glue
# Change DynamicFrame to Spark DataFrame
dataframe = DynamicFrame.toDF(datasource0)
# Create a view
dataframe.createOrReplaceTempView("students")
# Use SparkSQL to select the fields
dataframe_sql_df_dim = spark.sql("SELECT user_id, full_name, is_active, org_id, org_name, institution_id, department_id FROM assignments WHERE org_id in (" + org_ids + ")")
# Change back to DynamicFrame
selectfields = DynamicFrame.fromDF(dataframe_sql_df_dim, glueContext, "selectfields2")