使用AWS Glue ETL将拼花地板文件从S3加载到AWS RDS需要非常长的时间



我正在编写一个Glue ETL脚本,从S3读取一个镶木地板文件(大约2GB(并将其上传到RDS。然而,运行时间超过约14小时。有什么解决方案可以让它更快吗?

我读到自定义JDBC驱动程序非常慢,某些转换无法并行化,但我不确定这有多准确

这是一个经过编辑的示例代码(注意:这个脚本是使用Glue studio创建和编辑的(:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame
import boto3
import re
# def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
#     for alias, frame in mapping.items():
#         frame.toDF().createOrReplaceTempView(alias)
#     result = spark.sql(query)
#     return DynamicFrame.fromDF(result, glueContext, transformation_ctx)

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node farmlands_raw.fact_inventory
s3_client = boto3.client("s3")
result = s3_client.list_objects(
Bucket="XXXX",
Prefix=f"XXXX",
Delimiter="/",
)
dates = []
for o in result.get("CommonPrefixes"):
prefix = o.get("Prefix")
dates.append(
int(re.search(r"d{8}", prefix).group())
)
latest_date = max(dates)
print(latest_date)
node1661227531364 = (
glueContext.create_dynamic_frame.from_catalog(
database="XXXX",
table_name="XXXXX",
push_down_predicate = f"upload_date = {latest_date}",
transformation_ctx="XXXXX",
)
)

# # Script generated for node SQL
# SqlQuery9 = """
# select * from fact_inventory limit 10
# """
# SQL_node1661227556612 = sparkSqlQuery(
#     glueContext,
#     query=SqlQuery9,
#     mapping={"fact_inventory": node1661227531364},
#     transformation_ctx="SQL_node1661227556612",
# )
# Script generated for node Apply Mapping
ApplyMapping_node1661227598286 = ApplyMapping.apply(
frame=node1661227531364,
mappings=[
.... redacted 
],
transformation_ctx="ApplyMapping_node1661227598286",
)
# Script generated for node admin_ 579445444291_rds_connector
# admin_579445444291_rds_connector_node1661227607272 = (
#     glueContext.write_dynamic_frame.from_options(
#         frame=ApplyMapping_node1661227598286,
#         connection_type="custom.jdbc",
#         connection_options={
#             "tableName": "fact_inventory_staging_temp",
#             "dbTable": "fact_inventory_staging_temp",
#             "connectionName": "qu_farmlands_raw_merged",
#         },
#         transformation_ctx="admin_579445444291_rds_connector_node1661227607272",
#     )
# )
glueContext.write_dynamic_frame.from_options(
frame=ApplyMapping_node1661227598286,
connection_type="custom.jdbc",
connection_options={
"tableName": "xxxx",
"dbTable": "xxxx",
"connectionName": "xxxx",
},
transformation_ctx="xxxx",
)
job.commit()

您可以尝试在动态帧创建中使用此选项:additional_options = {"useS3ListImplementation":True}

问候,

最新更新