是否有更好的方法来优化下面的笔记本?目前运行一次需要2分20秒。我怎样才能提高性能?任何建议都将不胜感激。谢谢。
环境:- 中型spark pool (8 vCores/64gb), 3-30个节点,10个executor
- ADLSG2 premium(固态硬盘)
设置环境变量
environment = "mydatalake"
fileFormat = "parquet"
函数-设置从
加载源parquet文件的路径 tblName = ""
fldrName = ""
dbName = ""
filePrefix = ""
# Create the function
def fn_PathSource(fldrName,dbName,tblName,fileFormat,filePrefix):
str_path0 = "spark.read.load("
str_path1 = "'abfss://"
str_path2 = ".dfs.core.windows.net/sources"
str_path3 = ", format="
return f"{str_path0}{str_path1}{fldrName}@{environment}{str_path2}/{dbName}/{tblName}/{dbName}{filePrefix}{tblName}.{fileFormat}'{str_path3}'{fileFormat}')"
函数-设置表数据在数据库中存储的路径
# Create the variables used by the function
tblName = ""
fldrName = ""
dbName = ""
# Create the function
def fn_Path(fldrName,dbName,tblName):
str_path1 = "abfss://"
str_path2 = ".dfs.core.windows.net"
return f"{str_path1}{fldrName}@{environment}{str_path2}/{dbName}/{tblName}/"
函数-获取最新版本的记录
import hashlib
from pyspark.sql.functions import md5, concat_ws,col
# Create the variables used by the function
uniqueId = ""
versionId = ""
tblName = ""
# Create the function
def fn_ReadLatestVrsn(uniqueId,versionId,tblName):
df_Max = spark.sql(f"SELECT {uniqueId},MAX({versionId}) AS {versionId}Max FROM {tblName} GROUP BY {uniqueId}")
df_Max.createOrReplaceTempView(f"{tblName}Max")
df_Latest = spark.sql(f"SELECT {uniqueId},{versionId}Max FROM {tblName}Max")
df_Latest = df_Latest.withColumn("HashKey",md5(concat_ws("",col(f"{uniqueId}").cast("string"),col(f"{versionId}Max").cast("string"))))
df_Latest.createOrReplaceTempView(f"{tblName}Latest")
df_Hash = spark.sql(f"SELECT * FROM {tblName} t1")
df_Hash = df_Hash.withColumn("HashKey",md5(concat_ws("",col(f"{uniqueId}").cast("string"),col(f"{versionId}").cast("string"))))
df_Hash.createOrReplaceTempView(f"{tblName}Hash")
df_Final = spark.sql(f"SELECT DISTINCT t1.* FROM {tblName}Hash t1 INNER JOIN {tblName}Latest t2 ON t1.HashKey = t2.HashKey")
df_Final.createOrReplaceTempView(f"{tblName}")
return spark.sql(f"SELECT * FROM {tblName}")
用源表数据加载数据帧
DF_tblBitSize = eval(fn_PathSource("silver","MineDB","tblBitSize","parquet","_dbo_"))
DF_tblDailyReport = eval(fn_PathSource("silver","MineDB","tblDailyReport","parquet","_dbo_"))
DF_tblDailyReportHole = eval(fn_PathSource("silver","MineDB","tblDailyReportHole","parquet","_dbo_"))
DF_tblDailyReportHoleActivity = eval(fn_PathSource("silver","MineDB","tblDailyReportHoleActivity","parquet","_dbo_"))
DF_tblDailyReportHoleActivityHours = eval(fn_PathSource("silver","MineDB","tblDailyReportHoleActivityHours","parquet","_dbo_"))
DF_tblDailyReportShift = eval(fn_PathSource("silver","MineDB","tblDailyReportShift","parquet","_dbo_"))
DF_tblDrill = eval(fn_PathSource("silver","MineDB","tblDrill","parquet","_dbo_"))
DF_tblDrillType = eval(fn_PathSource("silver","MineDB","tblDrillType","parquet","_dbo_"))
DF_tblEmployee = eval(fn_PathSource("silver","MineDB","tblEmployee","parquet","_dbo_"))
DF_tblHole = eval(fn_PathSource("silver","MineDB","tblHole","parquet","_dbo_"))
DF_tblMineProject = eval(fn_PathSource("silver","MineDB","tblMineProject","parquet","_dbo_"))
DF_tblShift = eval(fn_PathSource("silver","MineDB","tblShift","parquet","_dbo_"))
DF_tblUnit = eval(fn_PathSource("silver","MineDB","tblUnit","parquet","_dbo_"))
DF_tblUnitType = eval(fn_PathSource("silver","MineDB","tblUnitType","parquet","_dbo_"))
DF_tblWorkSubCategory = eval(fn_PathSource("silver","MineDB","tblWorkSubCategory","parquet","_dbo_"))
DF_tblWorkSubCategoryType = eval(fn_PathSource("silver","MineDB","tblWorkSubCategoryType","parquet","_dbo_"))
DF_v_Dashboards_CompanyContracts= eval(fn_PathSource("silver","MineDB","v_Dashboards_CompanyContracts","parquet","_"))
DF_v_DailyReportShiftDrillers = eval(fn_PathSource("silver","MineDB","v_DailyReportShiftDrillers","parquet","_"))
DF_v_ActivityCharges = eval(fn_PathSource("silver","MineDB","v_ActivityCharges","parquet","_"))
将数据帧转换为可在SQL中使用的临时视图
DF_tblBitSize.createOrReplaceTempView("tblBitSize")
DF_tblDailyReport.createOrReplaceTempView("tblDailyReport")
DF_tblDailyReportHole.createOrReplaceTempView("tblDailyReportHole")
DF_tblDailyReportHoleActivity.createOrReplaceTempView("tblDailyReportHoleActivity")
DF_tblDailyReportHoleActivityHours.createOrReplaceTempView("tblDailyReportHoleActivityHours")
DF_tblDailyReportShift.createOrReplaceTempView("tblDailyReportShift")
DF_tblDrill.createOrReplaceTempView("tblDrill")
DF_tblDrillType.createOrReplaceTempView("tblDrillType")
DF_tblEmployee.createOrReplaceTempView("tblEmployee")
DF_tblHole.createOrReplaceTempView("tblHole")
DF_tblMineProject.createOrReplaceTempView("tblMineProject")
DF_tblShift.createOrReplaceTempView("tblShift")
DF_tblUnit.createOrReplaceTempView("tblUnit")
DF_tblUnitType.createOrReplaceTempView("tblUnitType")
DF_tblWorkSubCategory.createOrReplaceTempView("tblWorkSubCategory")
DF_tblWorkSubCategoryType.createOrReplaceTempView("tblWorkSubCategoryType") DF_v_Dashboards_CompanyContracts.createOrReplaceTempView("v_Dashboards_CompanyContracts")
DF_v_DailyReportShiftDrillers.createOrReplaceTempView("v_DailyReportShiftDrillers")
DF_v_ActivityCharges.createOrReplaceTempView("v_ActivityCharges")
加载最新数据到视图
当源系统表中的现有记录被更新(或发生软删除)时,Azure Data Factory通过创建增量parquet文件来捕获该更改。创建新记录时也会发生同样的情况。在合并过程中,所有增量文件被合并到一个parquet文件中。对于已更新(或已软删除)的现有记录,合并将创建该记录的两个版本,并附加最新版本。如果您要查询合并的parquet文件,您将看到一个重复的记录。因此,为了只查看该记录的最新版本,我们需要删除以前的版本。此函数将确保我们查看所有记录的最新版本。
**特别注意:这个逻辑对于没有软删除记录的表是不必要的(例如,没有LastModDateTime或ActiveInd列的表),因此,我们不应用这个函数到那些表
DF_tblBitSize = fn_ReadLatestVrsn("BitSizeID","LastModDateTime","tblBitSize")
DF_tblDailyReport = fn_ReadLatestVrsn("DailyReportID","LastModDateTime","tblDailyReport")
DF_tblDailyReportHole = fn_ReadLatestVrsn("DailyReportHoleID","LastModDateTime","tblDailyReportHole")
DF_tblDailyReportHoleActivity = fn_ReadLatestVrsn("DailyReportHoleActivityID","LastModDateTime","tblDailyReportHoleActivity")
DF_tblDailyReportHoleActivityHours = fn_ReadLatestVrsn("DailyReportHoleActivityHoursID","LastModDateTime","tblDailyReportHoleActivityHours")
DF_tblDailyReportShift = fn_ReadLatestVrsn("DailyReportShiftID","LastModDateTime","tblDailyReportShift")
DF_tblDrill = fn_ReadLatestVrsn("DrillID","LastModDateTime","tblDrill")
DF_tblEmployee = fn_ReadLatestVrsn("EmployeeID","LastModDateTime","tblEmployee")
DF_tblHole = fn_ReadLatestVrsn("HoleID","LastModDateTime","tblHole")
DF_tblMineProject = fn_ReadLatestVrsn("MineProjectID","LastModDateTime","tblMineProject")
DF_tblShift = fn_ReadLatestVrsn("ShiftID","LastModDateTime","tblShift")
DF_tblWorkSubCategoryType = fn_ReadLatestVrsn("WorkSubCategoryTypeID","LastModDateTime","tblWorkSubCategoryType")
CTE_UnitConversion
%%sql
CREATE OR REPLACE TEMP VIEW CTE_UnitConversion AS
(
SELECT
u.UnitID
,ut.UnitType
,u.UnitName
,u.UnitAbbr
,COALESCE(CAST(u.Conversion AS FLOAT),1) AS Conversion
FROM
tblUnit u
INNER JOIN tblUnitType ut
ON u.UnitTypeID = ut.UnitTypeID
AND ut.UnitType IN ('Distance','Depth')
UNION
SELECT
-1 AS UnitID
,'Unknown' AS UnitType
,'Unknown' AS UnitName
,'Unknown' AS UnitAbbr
,1 AS Conversion
)
CTE_Dashboards_BaseData
%%sql
CREATE OR REPLACE TEMP VIEW CTE_Dashboards_BaseData AS
(
SELECT
CC.ContractID,
CC.ProjectID,
CAST(DR.ReportDate AS DATE) AS ReportDate,
D.DrillID,
CAST(D.DrillName AS STRING) AS DrillName,
DT.DrillTypeID,
CAST(DT.DrillType AS STRING) AS DrillType,
CAST(NULL AS STRING) AS HoleName,
CAST(S.ShiftName AS STRING) AS ShiftName,
STRING(CONCAT(E.LastName,' ',E.FirstName)) AS Supervisor,
CAST(DRSD.Drillers AS STRING) AS Driller,
CAST(NULL AS FLOAT) AS TotalMeterage,
CAST(NULL AS FLOAT) AS Depth,
CAST(NULL AS STRING) AS DepthUnit,
CAST(NULL AS FLOAT) AS ManHours,
CAST(NULL AS FLOAT) AS Payrollhours,
CAST(NULL AS FLOAT) AS ActivityHours,
CAST(NULL AS FLOAT) AS EquipmentHours,
CAST(NULL AS FLOAT) AS Quantity,
CAST(NULL AS STRING) AS Category,
CAST(NULL AS STRING) AS SubCategory,
CAST(NULL AS STRING) AS HoursType,
CAST(NULL AS STRING) AS BitSize,
CAST(DRS.DailyReportShiftID AS BIGINT) AS DailyReportShiftID,
CAST(DRS.ShiftID AS INT) AS ShiftID,
CAST(NULL AS TIMESTAMP) AS CompleteDateTime,
CAST(NULL AS STRING) AS HoleCompletionStatus,
CAST(NULL AS STRING) AS Notes,
CAST(NULL AS INT) AS HoleID,
CAST(NULL AS FLOAT) AS DistanceFrom,
CAST(NULL AS FLOAT) AS DistanceTo,
CAST(NULL AS STRING) AS DistanceFromToUnit,
CAST(NULL AS FLOAT) AS Distance,
CAST(NULL AS STRING) AS DistanceUnit,
CAST(NULL AS STRING) AS FluidUnit,
CAST(NULL AS FLOAT) AS FluidVolume,
CAST(NULL AS STRING) AS UID,
CAST(NULL AS FLOAT) AS MaxDepth,
CAST(NULL AS FLOAT) AS Penetration,
CAST(NULL AS FLOAT) AS Charges,
CAST(DR.Status AS STRING) AS Status,
CAST(DRS.LastModDateTime AS TIMESTAMP) AS LastModDateTime
FROM
v_Dashboards_CompanyContracts CC
LEFT JOIN tblDailyReport DR ON CC.ContractID = DR.ContractID AND CC.ProjectID = DR.ProjectID
LEFT JOIN tblDailyReportShift DRS ON DR.DailyReportID = DRS.DailyReportID
LEFT JOIN tblShift S ON DRS.ShiftID = S.ShiftID
LEFT JOIN tblDrill D ON DR.DrillID = D.DrillID
LEFT JOIN tblDrillType DT ON D.DrillTypeID = DT.DrillTypeID
LEFT JOIN tblEmployee E ON DRS.SupervisorID = E.EmployeeID
LEFT JOIN v_DailyReportShiftDrillers DRSD ON DRS.DailyReportShiftID = DRSD.DailyReportShiftID
WHERE
DR.Status <> 'Deleted'
)
CTE_DailyReportHoleActivityManHours
%%sql
CREATE OR REPLACE TEMP VIEW CTE_DailyReportHoleActivityManHours AS
(
SELECT
DailyReportHoleActivityID
,SUM(HoursAsFloat) AS ManHours
FROM
tblDailyReportHoleActivityHours
WHERE
ActiveInd = 'Y'
GROUP BY
DailyReportHoleActivityID
)
活动费用
%%sql
CREATE OR REPLACE TEMP VIEW SECTION_1 AS
(
SELECT
BD.ContractID
,BD.ProjectID
,CAST(ReportDate AS DATE) AS ReportDate
,DrillID
,DRHA.Depth
,DPU.UnitAbbr AS DepthUnit
,DPU.UnitID AS DepthUnitID
,DRHAMH.ManHours
,DRHA.ActivityHoursAsFloat AS ActivityHours
,WSC.WorkSubCategoryName AS Category
,WSCT.TypeName AS SubCategory
,CASE
WHEN (COALESCE(AC.Charges,0) = 0 AND COALESCE(AC.BillableCount, 0) = 0) OR DRHA.Billable='N' THEN 'Non-Billable'
WHEN AC.DefinedRateName IS NOT NULL AND DRHA.Billable <> 'N' THEN AC.DefinedRateName
ELSE WSC.WorkSubCategoryName
END AS HoursType
,BS.BitSizeID AS BitSizeID
,BS.BitSize
,DRHA.BitID AS BitID
,BD.DailyReportShiftID
,DRHA.Notes
,H.HoleID
,DRHA.DistanceFrom
,DRHA.DistanceTo
,DFU.UnitAbbr AS DistanceFromToUnit
,DFU.UnitID AS DistanceFromToUnitID
,DRHA.Distance
,DU.UnitID AS DistanceUnitID
,CASE
WHEN WSC.WorkCategoryId = 1 THEN MAX(COALESCE(DRHA.DistanceTo, 0)) OVER ( PARTITION BY H.HoleID, WSC.WorkSubCategoryName ORDER BY H.HoleID, ReportDate, BD.ShiftID, DRHA.SequenceNumber, DRHA.CreateDateTime, DRHA.DistanceTo)
ELSE NULL
END AS MaxDepth
,CASE
WHEN WSC.WorkCategoryId = 1 THEN DRHA.Penetration
ELSE 0
END AS Penetration
,COALESCE(AC.Charges,0) AS Charges
,BD.Status
,H.MineProjectID
,CAST(DRHA.LastModDateTime AS TIMESTAMP) AS LastModDateTime
FROM
CTE_Dashboards_BaseData BD
INNER JOIN tblDailyReportHole DRH ON BD.DailyReportShiftID = DRH.DailyReportShiftID
INNER JOIN tblDailyReportHoleActivity DRHA ON DRH.DailyReportHoleID = DRHA.DailyReportHoleID
INNER JOIN tblWorkSubCategory WSC ON DRHA.WorkSubCategoryID = WSC.WorkSubCategoryID
LEFT JOIN tblHole H ON DRH.HoleID = H.HoleID
LEFT JOIN tblBitSize BS ON DRHA.BitSizeID = BS.BitSizeID
LEFT JOIN tblUnit DPU ON DRHA.DepthUnitID = DPU.UnitID
LEFT JOIN tblUnit DFU ON DRHA.DistanceFromToUnitID = DFU.UnitID
LEFT JOIN tblUnit DU ON DRHA.DistanceUnitID = DU.UnitID
LEFT JOIN tblWorkSubCategoryType WSCT ON DRHA.TypeID = WSCT.WorkSubCategoryTypeID
LEFT JOIN v_ActivityCharges AC ON DRHA.DailyReportHoleActivityID = AC.DailyReportHoleActivityID
LEFT JOIN CTE_DailyReportHoleActivityManHours DRHAMH ON DRHA.DailyReportHoleActivityID = DRHAMH.DailyReportHoleActivityID
WHERE
DRH.ActiveInd = 'Y'
AND DRHA.ActiveInd = 'Y'
)
创建FACT_Activity表
df = spark.sql("""
SELECT
ReportDate
,DrillingCompanyID
,MiningCompanyID
,DrillID
,ProjectID
,ContractID
,LocationID
,HoleID
,DailyReportShiftId
,MineProjectID
,BitID
,TRIM(UPPER(BitSize)) AS BitSize
,-1 AS TimesheetId
,CurrencyID
,TRIM(UPPER(Category)) AS Category
,TRIM(UPPER(SubCategory)) AS SubCategory
,TRIM(UPPER(HoursType)) AS HoursType
,TRIM(UPPER(Notes)) AS Notes
,ApprovalStatus
,Depth AS Depth
,(Depth/COALESCE(Depth.Conversion,1)) AS DepthMeters
,Manhours
,ActivityHours
,DistanceFrom
,DistanceTo
,Distance
,Penetration
,(DistanceFrom/Distance.Conversion) AS DistanceFromMeters
,(DistanceTo/Distance.Conversion) AS DistanceToMeters
,(Distance/Distance.Conversion) AS DistanceMeters
,(Penetration/Distance.Conversion) AS PenetrationMeters
,DepthUnitID
,DistanceFromToUnitID
,Charges
,LastModDateTime
,ReportApprovalRequired
FROM
(
SELECT
COALESCE(CAST(ReportDate AS DATE),'01/01/1900') AS ReportDate
,COALESCE(DrillingCompanyID,-1) AS DrillingCompanyID
,COALESCE(MiningCompanyID,-1) AS MiningCompanyID
,COALESCE(DrillID,-1) AS DrillID
,COALESCE(C.ProjectID, -1) AS ProjectID
,COALESCE(C.ContractID,-1) AS ContractID
,COALESCE(C.LocationID,-1) AS LocationID
,COALESCE(HoleID,-1) AS HoleID
,COALESCE(DailyReportShiftID,-1) AS DailyReportShiftId
,COALESCE(MP.MineProjectID,-1) AS MineProjectID
,COALESCE(BitID,-1) AS BitID
,COALESCE(BitSize,'UNKNOWN') AS BitSize
,COALESCE(DepthUnitID,-1) AS DepthUnitID
,COALESCE(DistanceFromToUnitID,-1) AS DistanceFromToUnitID
,COALESCE(DistanceUnitID,-1) AS DistanceUnitID
,COALESCE(C.CurrencyID,-1) AS CurrencyID
,COALESCE(Category,'Unknown') AS Category
,COALESCE(SubCategory,'UNKNOWN') AS SubCategory
,COALESCE(HoursType,'UNKNOWN') AS HoursType
,SUBSTRING(Notes,0,250) AS Notes
,COALESCE(U.Status,'Unknown') AS ApprovalStatus
,COALESCE(Depth,0) AS Depth
,COALESCE(Manhours,0) AS Manhours
,COALESCE(ActivityHours,0) AS ActivityHours
,COALESCE(DistanceFrom,0) AS DistanceFrom
,COALESCE(DistanceTo,0) AS DistanceTo
,COALESCE(Distance,0) AS Distance
,COALESCE(Penetration,0) AS Penetration
,COALESCE(Charges,0) AS Charges
,COALESCE(CAST(U.LastModDateTime AS TIMESTAMP),'1900/01/01 00:00:00') AS LastModDateTime
,C.ReportApprovalRequired
FROM
SECTION_1 U
LEFT JOIN v_Dashboards_CompanyContracts C ON U.ContractID = C.ContractID AND COALESCE(U.ProjectID,-1) = C.ProjectID
LEFT JOIN tblMineProject MP ON U.MineProjectID = MP.MineProjectID AND MP.ActiveInd = 'Y'
) TBL1
INNER JOIN CTE_UnitConversion Distance ON tbl1.DistanceFromToUnitID = Distance.UnitID
INNER JOIN CTE_UnitConversion Depth ON tbl1.DepthUnitID = Depth.UnitID
""")
创建表并写入数据
tblName = "fact_activity"
fldrName = "myfolder"
dbName = "mydatabase"
path = fn_Path(fldrName,dbName,tblName)
path
# Reduce the number of parquet files written using coalesce and write the dataframe to the datalake
df.coalesce(1).write.format("parquet").mode("overwrite").save(path)
# Drop the table (only dropping the metadata) if it exists in the lakehouse database
spark.sql(f"DROP TABLE IF EXISTS {dbName}.{tblName}")
# Now create the table (metadata only) and point it at the data in the datalake
spark.sql(f"CREATE TABLE {dbName}.{tblName} USING PARQUET LOCATION '{path}'")
从内存中释放SQL视图
%%sql
DROP VIEW SECTION_1;
DROP VIEW CTE_DailyReportHoleActivityManHours;
DROP VIEW CTE_Dashboards_BaseData;
DROP VIEW CTE_UnitConversion;
DROP VIEW tblBitSize;
DROP VIEW tblDailyReport;
DROP VIEW tblDailyReportHole;
DROP VIEW tblDailyReportHoleActivity;
DROP VIEW tblDailyReportHoleActivityHours;
DROP VIEW tblDailyReportShift;
DROP VIEW tblDrill;
DROP VIEW tblEmployee;
DROP VIEW tblHole;
DROP VIEW tblMineProject;
DROP VIEW tblShift;
从内存释放数据帧
del DF_tblBitSize
del DF_tblDailyReport
del DF_tblDailyReportHole
del DF_tblDailyReportHoleActivity
del DF_tblDailyReportHoleActivityHours
del DF_tblDailyReportShift
del DF_tblDrill
del DF_tblDrillType
del DF_tblEmployee
del DF_tblHole
del DF_tblMineProject
del DF_tblShift
del DF_tblUnit
del DF_tblUnitType
del DF_tblWorkSubCategory
del DF_v_Dashboards_CompanyContracts
del DF_v_DailyReportShiftDrillers
del DF_v_ActivityCharges
from 从内存中释放SQL视图and 从内存中释放数据帧一切都很好。
如果您的应用程序需要频繁地查询数据,并且需要创建VIEWS
,您可以在专用SQL池中创建EXTERNAL TABLE,并使用Synapse SQL保存表的视图。这将更有效,并且不需要在每次需要数据时丢弃VIEWS并释放数据框架。
您还可以在Azure Synapse Analytics中使用SQL池创建和使用本机外部表,因为本机外部表与外部数据源定义中使用TYPE=HADOOP
的外部表相比具有更好的性能。这是因为本机外部表使用本机代码访问外部数据。
您还可以参考Azure Synapse Analytics中无服务器SQL池的最佳实践,以获得有关性能优化的更多详细信息。