Azure Synapse无服务器SQL池-如何优化笔记本



是否有更好的方法来优化下面的笔记本?目前运行一次需要2分20秒。我怎样才能提高性能?任何建议都将不胜感激。谢谢。

环境:

  • 中型spark pool (8 vCores/64gb), 3-30个节点,10个executor
  • ADLSG2 premium(固态硬盘)

设置环境变量

    environment = "mydatalake"
    fileFormat = "parquet"

函数-设置从

加载源parquet文件的路径
    tblName = ""
    fldrName = ""
    dbName = ""
    filePrefix = ""
    # Create the function
    def fn_PathSource(fldrName,dbName,tblName,fileFormat,filePrefix):
        str_path0 = "spark.read.load("
        str_path1 = "'abfss://"
        str_path2 = ".dfs.core.windows.net/sources"
        str_path3 = ", format="
        return f"{str_path0}{str_path1}{fldrName}@{environment}{str_path2}/{dbName}/{tblName}/{dbName}{filePrefix}{tblName}.{fileFormat}'{str_path3}'{fileFormat}')"
    

函数-设置表数据在数据库中存储的路径

    # Create the variables used by the function
    tblName = ""
    fldrName = ""
    dbName = ""
    # Create the function
    def fn_Path(fldrName,dbName,tblName):
        str_path1 = "abfss://"
        str_path2 = ".dfs.core.windows.net"
        return f"{str_path1}{fldrName}@{environment}{str_path2}/{dbName}/{tblName}/"

函数-获取最新版本的记录

    import hashlib
    from pyspark.sql.functions import md5, concat_ws,col
    # Create the variables used by the function
    uniqueId = ""
    versionId = ""
    tblName = ""
    # Create the function
    def fn_ReadLatestVrsn(uniqueId,versionId,tblName):
        df_Max = spark.sql(f"SELECT {uniqueId},MAX({versionId}) AS {versionId}Max FROM {tblName} GROUP BY {uniqueId}")
        df_Max.createOrReplaceTempView(f"{tblName}Max")
        df_Latest = spark.sql(f"SELECT {uniqueId},{versionId}Max FROM {tblName}Max")
        df_Latest = df_Latest.withColumn("HashKey",md5(concat_ws("",col(f"{uniqueId}").cast("string"),col(f"{versionId}Max").cast("string"))))
        df_Latest.createOrReplaceTempView(f"{tblName}Latest")
        df_Hash = spark.sql(f"SELECT * FROM {tblName} t1")
        df_Hash = df_Hash.withColumn("HashKey",md5(concat_ws("",col(f"{uniqueId}").cast("string"),col(f"{versionId}").cast("string"))))
        df_Hash.createOrReplaceTempView(f"{tblName}Hash")
        df_Final = spark.sql(f"SELECT DISTINCT t1.* FROM {tblName}Hash t1 INNER JOIN {tblName}Latest t2 ON t1.HashKey = t2.HashKey")
        df_Final.createOrReplaceTempView(f"{tblName}")
        return spark.sql(f"SELECT * FROM {tblName}")

用源表数据加载数据帧

        DF_tblBitSize = eval(fn_PathSource("silver","MineDB","tblBitSize","parquet","_dbo_"))
        DF_tblDailyReport = eval(fn_PathSource("silver","MineDB","tblDailyReport","parquet","_dbo_"))
        DF_tblDailyReportHole = eval(fn_PathSource("silver","MineDB","tblDailyReportHole","parquet","_dbo_"))
        DF_tblDailyReportHoleActivity = eval(fn_PathSource("silver","MineDB","tblDailyReportHoleActivity","parquet","_dbo_"))
        DF_tblDailyReportHoleActivityHours = eval(fn_PathSource("silver","MineDB","tblDailyReportHoleActivityHours","parquet","_dbo_"))
        DF_tblDailyReportShift = eval(fn_PathSource("silver","MineDB","tblDailyReportShift","parquet","_dbo_"))
        DF_tblDrill = eval(fn_PathSource("silver","MineDB","tblDrill","parquet","_dbo_"))
        DF_tblDrillType = eval(fn_PathSource("silver","MineDB","tblDrillType","parquet","_dbo_"))
        DF_tblEmployee = eval(fn_PathSource("silver","MineDB","tblEmployee","parquet","_dbo_"))
        DF_tblHole = eval(fn_PathSource("silver","MineDB","tblHole","parquet","_dbo_"))
        DF_tblMineProject = eval(fn_PathSource("silver","MineDB","tblMineProject","parquet","_dbo_"))
        DF_tblShift = eval(fn_PathSource("silver","MineDB","tblShift","parquet","_dbo_"))
        DF_tblUnit = eval(fn_PathSource("silver","MineDB","tblUnit","parquet","_dbo_"))
        DF_tblUnitType = eval(fn_PathSource("silver","MineDB","tblUnitType","parquet","_dbo_"))
        DF_tblWorkSubCategory = eval(fn_PathSource("silver","MineDB","tblWorkSubCategory","parquet","_dbo_"))
        DF_tblWorkSubCategoryType = eval(fn_PathSource("silver","MineDB","tblWorkSubCategoryType","parquet","_dbo_"))
        DF_v_Dashboards_CompanyContracts= eval(fn_PathSource("silver","MineDB","v_Dashboards_CompanyContracts","parquet","_"))
        DF_v_DailyReportShiftDrillers = eval(fn_PathSource("silver","MineDB","v_DailyReportShiftDrillers","parquet","_"))
        DF_v_ActivityCharges = eval(fn_PathSource("silver","MineDB","v_ActivityCharges","parquet","_"))

将数据帧转换为可在SQL中使用的临时视图

        DF_tblBitSize.createOrReplaceTempView("tblBitSize")
        DF_tblDailyReport.createOrReplaceTempView("tblDailyReport")
        DF_tblDailyReportHole.createOrReplaceTempView("tblDailyReportHole")
        DF_tblDailyReportHoleActivity.createOrReplaceTempView("tblDailyReportHoleActivity")
        DF_tblDailyReportHoleActivityHours.createOrReplaceTempView("tblDailyReportHoleActivityHours")
        DF_tblDailyReportShift.createOrReplaceTempView("tblDailyReportShift")
        DF_tblDrill.createOrReplaceTempView("tblDrill")
        DF_tblDrillType.createOrReplaceTempView("tblDrillType")
        DF_tblEmployee.createOrReplaceTempView("tblEmployee")
        DF_tblHole.createOrReplaceTempView("tblHole")
        DF_tblMineProject.createOrReplaceTempView("tblMineProject")
        DF_tblShift.createOrReplaceTempView("tblShift")
        DF_tblUnit.createOrReplaceTempView("tblUnit")
        DF_tblUnitType.createOrReplaceTempView("tblUnitType")
        DF_tblWorkSubCategory.createOrReplaceTempView("tblWorkSubCategory")
        DF_tblWorkSubCategoryType.createOrReplaceTempView("tblWorkSubCategoryType")                                                 DF_v_Dashboards_CompanyContracts.createOrReplaceTempView("v_Dashboards_CompanyContracts")
        DF_v_DailyReportShiftDrillers.createOrReplaceTempView("v_DailyReportShiftDrillers")
        DF_v_ActivityCharges.createOrReplaceTempView("v_ActivityCharges")

加载最新数据到视图

当源系统表中的现有记录被更新(或发生软删除)时,Azure Data Factory通过创建增量parquet文件来捕获该更改。创建新记录时也会发生同样的情况。在合并过程中,所有增量文件被合并到一个parquet文件中。对于已更新(或已软删除)的现有记录,合并将创建该记录的两个版本,并附加最新版本。如果您要查询合并的parquet文件,您将看到一个重复的记录。因此,为了只查看该记录的最新版本,我们需要删除以前的版本。此函数将确保我们查看所有记录的最新版本。

**特别注意:这个逻辑对于没有软删除记录的表是不必要的(例如,没有LastModDateTime或ActiveInd列的表),因此,我们不应用这个函数到那些表

    DF_tblBitSize = fn_ReadLatestVrsn("BitSizeID","LastModDateTime","tblBitSize")
    DF_tblDailyReport = fn_ReadLatestVrsn("DailyReportID","LastModDateTime","tblDailyReport")
    DF_tblDailyReportHole = fn_ReadLatestVrsn("DailyReportHoleID","LastModDateTime","tblDailyReportHole")
    DF_tblDailyReportHoleActivity = fn_ReadLatestVrsn("DailyReportHoleActivityID","LastModDateTime","tblDailyReportHoleActivity")
    DF_tblDailyReportHoleActivityHours = fn_ReadLatestVrsn("DailyReportHoleActivityHoursID","LastModDateTime","tblDailyReportHoleActivityHours")
    DF_tblDailyReportShift = fn_ReadLatestVrsn("DailyReportShiftID","LastModDateTime","tblDailyReportShift")
    DF_tblDrill = fn_ReadLatestVrsn("DrillID","LastModDateTime","tblDrill")
    DF_tblEmployee = fn_ReadLatestVrsn("EmployeeID","LastModDateTime","tblEmployee")
    DF_tblHole = fn_ReadLatestVrsn("HoleID","LastModDateTime","tblHole")
    DF_tblMineProject = fn_ReadLatestVrsn("MineProjectID","LastModDateTime","tblMineProject")
    DF_tblShift = fn_ReadLatestVrsn("ShiftID","LastModDateTime","tblShift")
    DF_tblWorkSubCategoryType = fn_ReadLatestVrsn("WorkSubCategoryTypeID","LastModDateTime","tblWorkSubCategoryType")

CTE_UnitConversion

    %%sql
    CREATE OR REPLACE TEMP VIEW  CTE_UnitConversion AS
    (
        SELECT 
            u.UnitID
            ,ut.UnitType
            ,u.UnitName
            ,u.UnitAbbr
            ,COALESCE(CAST(u.Conversion AS FLOAT),1) AS Conversion
        FROM 
            tblUnit u 
            INNER JOIN tblUnitType ut 
                ON u.UnitTypeID = ut.UnitTypeID
                AND ut.UnitType IN ('Distance','Depth')
        UNION
        SELECT 
            -1 AS UnitID
            ,'Unknown' AS UnitType
            ,'Unknown' AS UnitName
            ,'Unknown' AS UnitAbbr
            ,1 AS Conversion
    )

CTE_Dashboards_BaseData

    %%sql
    CREATE OR REPLACE TEMP VIEW  CTE_Dashboards_BaseData AS
    (
        SELECT 
            CC.ContractID,
            CC.ProjectID,
            CAST(DR.ReportDate AS DATE) AS ReportDate,
            D.DrillID,
            CAST(D.DrillName AS STRING) AS DrillName,
            DT.DrillTypeID,
            CAST(DT.DrillType AS STRING) AS DrillType,
            CAST(NULL AS STRING) AS HoleName,
            CAST(S.ShiftName AS STRING) AS ShiftName,
            STRING(CONCAT(E.LastName,' ',E.FirstName)) AS Supervisor,
            CAST(DRSD.Drillers AS STRING) AS Driller,
            CAST(NULL AS FLOAT) AS TotalMeterage,
            CAST(NULL AS FLOAT) AS Depth,
            CAST(NULL AS STRING) AS DepthUnit,
            CAST(NULL AS FLOAT) AS ManHours,
            CAST(NULL AS FLOAT) AS Payrollhours,
            CAST(NULL AS FLOAT) AS ActivityHours,
            CAST(NULL AS FLOAT) AS EquipmentHours,
            CAST(NULL AS FLOAT) AS Quantity,
            CAST(NULL AS STRING) AS Category,
            CAST(NULL AS STRING) AS SubCategory,
            CAST(NULL AS STRING) AS HoursType,
            CAST(NULL AS STRING) AS BitSize,
            CAST(DRS.DailyReportShiftID AS BIGINT) AS DailyReportShiftID,
            CAST(DRS.ShiftID AS INT) AS ShiftID,
            CAST(NULL AS TIMESTAMP) AS CompleteDateTime,
            CAST(NULL AS STRING) AS HoleCompletionStatus,
            CAST(NULL AS STRING) AS Notes,
            CAST(NULL AS INT) AS HoleID,
            CAST(NULL AS FLOAT) AS DistanceFrom,
            CAST(NULL AS FLOAT) AS DistanceTo,
            CAST(NULL AS STRING) AS DistanceFromToUnit,
            CAST(NULL AS FLOAT) AS Distance,
            CAST(NULL AS STRING) AS DistanceUnit,
            CAST(NULL AS STRING) AS FluidUnit,
            CAST(NULL AS FLOAT) AS FluidVolume,
            CAST(NULL AS STRING) AS UID,
            CAST(NULL AS FLOAT) AS MaxDepth,
            CAST(NULL AS FLOAT) AS Penetration,
            CAST(NULL AS FLOAT) AS Charges,
            CAST(DR.Status AS STRING) AS Status,
            CAST(DRS.LastModDateTime AS TIMESTAMP) AS LastModDateTime
        FROM 
            v_Dashboards_CompanyContracts CC
            LEFT JOIN tblDailyReport DR ON CC.ContractID = DR.ContractID AND CC.ProjectID = DR.ProjectID
            LEFT JOIN tblDailyReportShift DRS ON DR.DailyReportID = DRS.DailyReportID
            LEFT JOIN tblShift S ON DRS.ShiftID = S.ShiftID
            LEFT JOIN tblDrill D ON DR.DrillID = D.DrillID
            LEFT JOIN tblDrillType DT ON D.DrillTypeID = DT.DrillTypeID
            LEFT JOIN tblEmployee E ON DRS.SupervisorID = E.EmployeeID
            LEFT JOIN v_DailyReportShiftDrillers DRSD ON DRS.DailyReportShiftID = DRSD.DailyReportShiftID
        WHERE 
            DR.Status <> 'Deleted'
    )

CTE_DailyReportHoleActivityManHours

    %%sql
    CREATE OR REPLACE TEMP VIEW  CTE_DailyReportHoleActivityManHours AS
    (
      SELECT 
        DailyReportHoleActivityID
        ,SUM(HoursAsFloat) AS ManHours
      FROM 
        tblDailyReportHoleActivityHours
      WHERE 
        ActiveInd = 'Y'
      GROUP BY 
        DailyReportHoleActivityID
    )

活动费用

    %%sql
    CREATE OR REPLACE TEMP VIEW SECTION_1 AS
    (
        SELECT 
            BD.ContractID
            ,BD.ProjectID
            ,CAST(ReportDate AS DATE) AS ReportDate
            ,DrillID
            ,DRHA.Depth
            ,DPU.UnitAbbr AS DepthUnit
            ,DPU.UnitID AS DepthUnitID
            ,DRHAMH.ManHours
            ,DRHA.ActivityHoursAsFloat AS ActivityHours
            ,WSC.WorkSubCategoryName AS Category
            ,WSCT.TypeName AS SubCategory
            ,CASE 
                WHEN (COALESCE(AC.Charges,0) = 0 AND COALESCE(AC.BillableCount, 0) = 0) OR DRHA.Billable='N' THEN 'Non-Billable'
                WHEN AC.DefinedRateName IS NOT NULL AND DRHA.Billable <> 'N' THEN AC.DefinedRateName
                ELSE WSC.WorkSubCategoryName 
            END AS HoursType
            ,BS.BitSizeID AS BitSizeID
            ,BS.BitSize
            ,DRHA.BitID AS BitID
            ,BD.DailyReportShiftID
            ,DRHA.Notes
            ,H.HoleID
            ,DRHA.DistanceFrom
            ,DRHA.DistanceTo
            ,DFU.UnitAbbr AS DistanceFromToUnit
            ,DFU.UnitID AS DistanceFromToUnitID
            ,DRHA.Distance
            ,DU.UnitID AS DistanceUnitID
            ,CASE 
                WHEN WSC.WorkCategoryId  = 1 THEN MAX(COALESCE(DRHA.DistanceTo, 0)) OVER ( PARTITION BY H.HoleID, WSC.WorkSubCategoryName  ORDER BY H.HoleID, ReportDate, BD.ShiftID, DRHA.SequenceNumber, DRHA.CreateDateTime, DRHA.DistanceTo)
                ELSE NULL
            END AS MaxDepth
            ,CASE 
                WHEN WSC.WorkCategoryId  = 1 THEN DRHA.Penetration
                ELSE 0
            END AS Penetration
            ,COALESCE(AC.Charges,0) AS Charges
            ,BD.Status
            ,H.MineProjectID
            ,CAST(DRHA.LastModDateTime AS TIMESTAMP) AS LastModDateTime
        FROM 
            CTE_Dashboards_BaseData BD
            INNER JOIN tblDailyReportHole DRH ON BD.DailyReportShiftID = DRH.DailyReportShiftID
            INNER JOIN tblDailyReportHoleActivity DRHA ON DRH.DailyReportHoleID = DRHA.DailyReportHoleID
            INNER JOIN tblWorkSubCategory WSC ON DRHA.WorkSubCategoryID = WSC.WorkSubCategoryID
            LEFT JOIN tblHole H ON DRH.HoleID = H.HoleID
            LEFT JOIN tblBitSize BS ON DRHA.BitSizeID = BS.BitSizeID
            LEFT JOIN tblUnit DPU ON DRHA.DepthUnitID = DPU.UnitID 
            LEFT JOIN tblUnit DFU ON DRHA.DistanceFromToUnitID = DFU.UnitID 
            LEFT JOIN tblUnit DU ON DRHA.DistanceUnitID = DU.UnitID 
            LEFT JOIN tblWorkSubCategoryType WSCT ON DRHA.TypeID = WSCT.WorkSubCategoryTypeID
            LEFT JOIN v_ActivityCharges AC ON DRHA.DailyReportHoleActivityID = AC.DailyReportHoleActivityID
            LEFT JOIN CTE_DailyReportHoleActivityManHours DRHAMH ON DRHA.DailyReportHoleActivityID = DRHAMH.DailyReportHoleActivityID
        WHERE 
            DRH.ActiveInd = 'Y' 
            AND DRHA.ActiveInd = 'Y'
    )
创建FACT_Activity表

    df = spark.sql("""
        SELECT
            ReportDate
            ,DrillingCompanyID
            ,MiningCompanyID
            ,DrillID
            ,ProjectID
            ,ContractID
            ,LocationID
            ,HoleID
            ,DailyReportShiftId
            ,MineProjectID
            ,BitID
            ,TRIM(UPPER(BitSize)) AS BitSize
            ,-1 AS TimesheetId
            ,CurrencyID
            ,TRIM(UPPER(Category)) AS Category
            ,TRIM(UPPER(SubCategory)) AS SubCategory
            ,TRIM(UPPER(HoursType)) AS HoursType
            ,TRIM(UPPER(Notes)) AS Notes
            ,ApprovalStatus
            ,Depth AS Depth
            ,(Depth/COALESCE(Depth.Conversion,1)) AS DepthMeters
            ,Manhours
            ,ActivityHours
            ,DistanceFrom
            ,DistanceTo
            ,Distance
            ,Penetration
            ,(DistanceFrom/Distance.Conversion) AS DistanceFromMeters
            ,(DistanceTo/Distance.Conversion) AS DistanceToMeters
            ,(Distance/Distance.Conversion) AS DistanceMeters
            ,(Penetration/Distance.Conversion) AS PenetrationMeters
            ,DepthUnitID
            ,DistanceFromToUnitID
            ,Charges
            ,LastModDateTime
            ,ReportApprovalRequired
        FROM
        (
            SELECT 
                COALESCE(CAST(ReportDate AS DATE),'01/01/1900') AS ReportDate
                ,COALESCE(DrillingCompanyID,-1) AS DrillingCompanyID
                ,COALESCE(MiningCompanyID,-1) AS MiningCompanyID
                ,COALESCE(DrillID,-1) AS DrillID
                ,COALESCE(C.ProjectID, -1) AS ProjectID
                ,COALESCE(C.ContractID,-1) AS ContractID
                ,COALESCE(C.LocationID,-1) AS LocationID
                ,COALESCE(HoleID,-1) AS HoleID
                ,COALESCE(DailyReportShiftID,-1) AS DailyReportShiftId
                ,COALESCE(MP.MineProjectID,-1) AS MineProjectID
                ,COALESCE(BitID,-1) AS BitID
                ,COALESCE(BitSize,'UNKNOWN') AS BitSize
                ,COALESCE(DepthUnitID,-1) AS DepthUnitID
                ,COALESCE(DistanceFromToUnitID,-1) AS DistanceFromToUnitID
                ,COALESCE(DistanceUnitID,-1) AS DistanceUnitID
                ,COALESCE(C.CurrencyID,-1) AS CurrencyID
                ,COALESCE(Category,'Unknown') AS Category
                ,COALESCE(SubCategory,'UNKNOWN') AS SubCategory
                ,COALESCE(HoursType,'UNKNOWN') AS HoursType
                ,SUBSTRING(Notes,0,250) AS Notes
                ,COALESCE(U.Status,'Unknown') AS ApprovalStatus
                ,COALESCE(Depth,0) AS Depth
                ,COALESCE(Manhours,0) AS Manhours
                ,COALESCE(ActivityHours,0) AS ActivityHours
                ,COALESCE(DistanceFrom,0) AS DistanceFrom
                ,COALESCE(DistanceTo,0) AS DistanceTo
                ,COALESCE(Distance,0) AS Distance
                ,COALESCE(Penetration,0) AS Penetration
                ,COALESCE(Charges,0) AS Charges
                ,COALESCE(CAST(U.LastModDateTime AS TIMESTAMP),'1900/01/01 00:00:00') AS LastModDateTime
                ,C.ReportApprovalRequired
            FROM
                SECTION_1 U
                LEFT JOIN v_Dashboards_CompanyContracts C ON U.ContractID = C.ContractID AND COALESCE(U.ProjectID,-1) = C.ProjectID
                LEFT JOIN tblMineProject MP ON U.MineProjectID = MP.MineProjectID AND MP.ActiveInd = 'Y'
        ) TBL1
        INNER JOIN CTE_UnitConversion Distance ON tbl1.DistanceFromToUnitID = Distance.UnitID
        INNER JOIN CTE_UnitConversion Depth ON tbl1.DepthUnitID = Depth.UnitID
    """)

创建表并写入数据

    tblName = "fact_activity"
    fldrName = "myfolder"
    dbName = "mydatabase"
    path = fn_Path(fldrName,dbName,tblName)
    path
    # Reduce the number of parquet files written using coalesce and write the dataframe to the datalake
    df.coalesce(1).write.format("parquet").mode("overwrite").save(path)
    # Drop the table (only dropping the metadata) if it exists in the lakehouse database
    spark.sql(f"DROP TABLE IF EXISTS {dbName}.{tblName}")
    # Now create the table (metadata only) and point it at the data in the datalake
    spark.sql(f"CREATE TABLE {dbName}.{tblName} USING PARQUET LOCATION '{path}'")

从内存中释放SQL视图

    %%sql
    DROP VIEW SECTION_1;
    DROP VIEW CTE_DailyReportHoleActivityManHours;
    DROP VIEW CTE_Dashboards_BaseData;
    DROP VIEW CTE_UnitConversion;
    DROP VIEW tblBitSize;
    DROP VIEW tblDailyReport;
    DROP VIEW tblDailyReportHole;
    DROP VIEW tblDailyReportHoleActivity;
    DROP VIEW tblDailyReportHoleActivityHours;
    DROP VIEW tblDailyReportShift;
    DROP VIEW tblDrill;
    DROP VIEW tblEmployee;
    DROP VIEW tblHole;
    DROP VIEW tblMineProject;
    DROP VIEW tblShift;

从内存释放数据帧

    del DF_tblBitSize
    del DF_tblDailyReport
    del DF_tblDailyReportHole
    del DF_tblDailyReportHoleActivity
    del DF_tblDailyReportHoleActivityHours
    del DF_tblDailyReportShift
    del DF_tblDrill
    del DF_tblDrillType
    del DF_tblEmployee
    del DF_tblHole
    del DF_tblMineProject
    del DF_tblShift
    del DF_tblUnit
    del DF_tblUnitType
    del DF_tblWorkSubCategory
    del DF_v_Dashboards_CompanyContracts
    del DF_v_DailyReportShiftDrillers
    del DF_v_ActivityCharges

from 从内存中释放SQL视图and 从内存中释放数据帧一切都很好。

如果您的应用程序需要频繁地查询数据,并且需要创建VIEWS,您可以在专用SQL池中创建EXTERNAL TABLE,并使用Synapse SQL保存表的视图。这将更有效,并且不需要在每次需要数据时丢弃VIEWS并释放数据框架。

您还可以在Azure Synapse Analytics中使用SQL池创建和使用本机外部表,因为本机外部表与外部数据源定义中使用TYPE=HADOOP的外部表相比具有更好的性能。这是因为本机外部表使用本机代码访问外部数据。

您还可以参考Azure Synapse Analytics中无服务器SQL池的最佳实践,以获得有关性能优化的更多详细信息。

最新更新