DynamicFrame.fromDF在使用glueContext.write_from_options()写入数据库时



我有一个粘贴作业,其中我需要从SQL Server中读取两个表中的数据,执行一些联接/转换,并写回SQL Server中的另一个新表/截断表。要写入的数据大小约为15GB

我尝试了以下两种方法,发现性能有很大差异。我希望在10分钟内完成这项工作。

方法1-总体耗时约17分钟(从SQL Server读取数据、转换、写入S3、从S3读取、写回SQL Server(

  • 从SQLServer读取spark数据帧(大约3-5秒(
  • 对火花数据帧执行转换(大约5秒(
  • 将数据写入S3的临时存储器(大约8分钟(
  • 使用glueContext.create_dynamic_frame.from_options((从S3读取转换为动态数据帧
  • 使用glueContext.Write_from_options((写入SQLServer表(9分钟(

方法2-总体耗时约50分钟(从SQL Server读取数据、转换、写回SQL Server(

  • 从SQLServer读取spark数据帧(大约3-5秒(
  • 对火花数据帧执行转换(大约5秒(
  • 使用将spark数据帧转换为动态数据帧DynamicFrame.fromDF((
  • 使用glueContext.Write_from_options((写入SqlServer表(43分钟(

我观察到,在第二种方法中,通过将spark数据帧转换为Dynamic数据帧,并使用它来写入SQL Server,即使我避免了写入S3和从S3读回,也会花费更多的时间。此外,在向表写入数据之前,表会被截断。我原本希望通过删除S3 R/write,我可以在10-12分钟内完成任务。

我是不是遗漏了什么?有什么建议吗。

方法的代码模板1:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from pyspark.sql.types import *
from pyspark.sql.functions import *
import time
from py4j.java_gateway import java_import
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()

# STEP 1 -- READ DATA FROM TABLES INTO DATAFRAMES
# -----------------------------------------------
# STEP 2 -- PERFORM TRANSFORMATIONS IF ANY, AND WRITE TO DATALAKE - S3
#----------------------------------------------------------------------
df.write.mode("overwrite").csv("s3://<<bucket-name>>/temp_result")
# STEP 3 -- READ DATA FROM S3 INTO NEW DATAFRAMES
#------------------------------------------------
newdf = glueContext.create_dynamic_frame.from_options(connection_type='s3',connection_options = {"paths": ["s3://<<bucket-name>>/temp_result"]},format='csv')
# STEP 4 -- TRUNCATE TARGET TABLE AS ITS A FULL REFRESH ALWAYS IN THE TARGET TABLE
#---------------------------------------------------------------------------------
cstmt = conn.prepareCall("TRUNCATE TABLE mytable_in_db");
results = cstmt.execute();
# STEP 5 -- WRITE TO TARGET TABLE FROM DATAFRAME
# ----------------------------------------------
glueContext.write_from_options(frame_or_dfc=newdf, connection_type="sqlserver", connection_options=connection_sqlserver_options)
job.commit()

方法2的代码模板:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from pyspark.sql.types import *
from pyspark.sql.functions import *
import time
from py4j.java_gateway import java_import
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()

# STEP 1 -- READ DATA FROM TABLES INTO DATAFRAMES
# -----------------------------------------------
# STEP 2 -- PERFORM TRANSFORMATIONS IF ANY AND STORE TO df
#----------------------------------------------------------------------
df contains transformed data

# STEP 3 -- CONVERT SPARK DATAFRAME TO DYNAMIC DATAFRAME
#--------------------------------------------------------
newdf2 = DynamicFrame.fromDF(df, glueContext , "newdf2")
# STEP 4 -- TRUNCATE TARGET TABLE AS ITS A FULL REFRESH ALWAYS IN THE TARGET TABLE
#---------------------------------------------------------------------------------
cstmt = conn.prepareCall("TRUNCATE TABLE mytable_in_db");
results = cstmt.execute();
# STEP 5 -- WRITE TO TARGET TABLE FROM DATAFRAME
# ----------------------------------------------
glueContext.write_from_options(frame_or_dfc=newdf2, connection_type="sqlserver", connection_options=connection_sqlserver_options)
job.commit()

我也面临同样的问题。从我的SQL ProfilerActivity Monitor来看,glueContext.create_dynamic_frame.from_options()似乎是弱点。

当Glue从源实例(也是RDS SQL Server(获取dynamicFrame时,我注意到目标实例(RDS SQL服务器(上缺少打开的会话,因此得出了这个结论。

需要注意的重要事项是,当要从源中保留较大的表(1-2+百万条记录(时,就会出现空闲状态。

我建议尝试绕过Glue的方法,使用spark.read的方法。创建您的dataFrame,执行所有转换并转换为dynamicFrame进行加载。

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

tbl = df = self.spark.read.format("jdbc").option("url","jdbc:sqlserver://").option("user","").option("password","").option("dbtable","").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").load()

相关内容

  • 没有找到相关文章

最新更新