从选项(从 rds - mysql)创建动态帧,提供带有 where 子句的自定义查询



我想从Aurora-rds mysql表在我的Glue作业中创建一个DynamicFrame。我可以使用自定义查询从我的 rds 表创建动态帧吗 - 有一个 where 子句? 我不想每次在我的 DynamicFrame 中读取整个表,然后稍后进行过滤。 查看了这个网站,但在这里或其他地方找不到任何选择,https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-connect.html

构造 JDBC 连接选项

connection_mysql5_options = { "url": "jdbc:mysql://:3306/db", "dbtable": "test", "用户": "管理员", "密码": "PWD"}

从 MySQL 5 读取 DynamicFrame

df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options(

有没有办法给出一个 where 子句并说只从测试表中选择前 100 行,假设它有一个名为"id"的列,我想使用此查询获取:

从 ID <100 的测试中选择 *;

感谢任何帮助。谢谢!

抱歉,我会发表评论,但我没有足够的声誉。我能够使 Guillermo AMS 提供的解决方案在 AWS Glue 中工作,但它确实需要两个更改:

  • 无法识别"jdbc"格式(提供的错误是:"py4j.protocol.Py4JJavaError:调用 o79.load 时出错。 :java.lang.ClassNotFoundException:找不到数据源:jbdc。请在 http://spark.apache.org/third-party-projects.html"( 找到套餐-- 我不得不使用全名:"org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider">
  • 查询选项对我不起作用(提供的错误是:"py4j.protocol.Py4JJavaError:调用 o72.load 时出错。 :java.sql.SQLSyntaxErrorException: ORA-00911: invalid character"(,但幸运的是,"dbtable"选项支持传入表或子查询 - 即在查询周围使用括号。

在下面的解决方案中,我还围绕所需的对象和导入添加了一些上下文.
我的解决方案最终看起来像:

from awsglue.context import GlueContext
from pyspark.context import SparkContext
glue_context = GlueContext(SparkContext.getOrCreate())
tmp_data_frame = glue_context.spark_session.read
.format("org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider")
.option("url", jdbc_url)
.option("user", username)
.option("password", password)
.option("dbtable", "(select * from test where id<100)")
.load()

我能够提供自定义查询的方法是创建一个 Spark 数据帧并使用选项指定它: https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#manually-specifying-options

然后使用上述类将该数据帧转换为动态帧: https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-dynamic-frame.html

tmp_data_frame = spark.read.format("jbdc")
.option("url", jdbc_url)
.option("user", username)
.option("password", password)
.option("query", "select * from test where id<100")
.load()
dynamic_frame = DynamicFrame.fromDF(tmp_data_frame, glueContext)

最新更新