将Apache Spark Scala重写为PySpark



Community,我不熟悉Scala,也不太熟悉PySpark。然而,我对Scala不太熟悉,因此希望有人能告诉我是否有人能帮助我将以下Apache Spark Scala重写为PySpark。

如果你要问我到目前为止做了什么来帮助自己,我会诚实地说很少,因为我还处于编码的早期。

因此,如果你能帮助将以下内容重新编码到PySpark中,或者让我走上正确的道路,这样我就可以自己重新编码,这将对非常有帮助

import org.apache.spark.sql.DataFrame
def readParquet(basePath: String): DataFrame = {
val parquetDf = spark
.read
.parquet(basePath)
return parquetDf
}
def num(df: DataFrame): Int = {
val numPartitions = df.rdd.getNumPartitions
return numPartitions
}

def ram(size: Int): Int = {
val ramMb = size
return ramMb
}
def target(size: Int): Int = {
val targetMb = size
return targetMb
}

def dp(): Int = {
val defaultParallelism  = spark.sparkContext.defaultParallelism
return defaultParallelism
}
def files(dp: Int, multiplier: Int, ram: Int, target: Int): Int = {
val maxPartitions = Math.max(dp * multiplier, Math.ceil(ram / target).toInt)
return maxPartitions
}

def split(df: DataFrame, max: Int): DataFrame = {
val repartitionDf = df.repartition(max)
return repartitionDf
}
def writeParquet(df: DataFrame, targetPath: String) {
return df.write.format("parquet").mode("overwrite").save(targetPath)
}
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().appName("spark-repartition-optimizer-app").getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", 2001) // example
val parquetDf = readParquet("/blogs/source/airlines.parquet/")
val numPartitions = num(parquetDf)
val ramMb = ram(6510) // approx. df cache size
val targetMb = target(128) // approx. partition size (between 50 and 200 mb)
val defaultParallelism = dp()
val maxPartitions = files(defaultParallelism, 2, ramMb, targetMb)
val repartitionDf = split(parquetDf, maxPartitions)
writeParquet(repartitionDf, "/blogs/optimized/airlines.parquet/")

我只需要自己将Scala代码重新编码到PySpark。

这是通过在pyspark中包含以下模块来修复的。

import module

最新更新