如何在 Pyspark 中使用 Scala 类



我一直在寻找是否有任何方法可以在Pyspark中使用Scala类,但我没有找到任何关于这个主题的文档或指南。

假设我在 Scala 中创建一个简单的类,它使用一些 apache-spark 库,如下所示:

class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
  def exe(): DataFrame = {
    import sqlContext.implicits._
    df.select(col(column))
  }
}
  • 有没有办法在Pyspark中使用这个类?
  • 是不是太难了?
  • 我必须创建一个.py文件吗?
  • 有没有说明如何做到这一点的指南?

顺便说一下,我还查看了spark代码,我感到有点迷茫,我无法为自己的目的复制它们的功能。

是的,这是可能的,尽管可能远非微不足道。通常,您需要一个Java(友好)包装器,这样您就不必处理Scala功能,这些功能无法使用普通Java轻松表达,因此不能很好地与Py4J网关配合使用。

假设你的类是 int 包com.example并且有 Python DataFrame 调用df

df = ... # Python DataFrame

您必须:

  1. 使用您最喜欢的构建工具构建 jar。

  2. 例如,将其包含在驱动程序类路径中--driver-class-path PySpark shell/spark-submit 的参数。根据确切的代码,您可能还必须使用--jars传递它

  3. 从 Python SparkContext 实例中提取 JVM 实例:

    jvm = sc._jvm
    
  4. SQLContext实例中提取 Scala SQLContext

    ssqlContext = sqlContext._ssql_ctx
    
  5. df中提取 Java DataFrame

    jdf = df._jdf
    
  6. 创建SimpleClass的新实例:

    simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
    
  7. 调用exe方法并使用 Python DataFrame 包装结果:

    from pyspark.sql import DataFrame
    DataFrame(simpleObject.exe(), ssqlContext)
    

结果应该是有效的 PySpark DataFrame 。当然,您可以将所有步骤合并到一个调用中。

重要提示:仅当 Python 代码仅在驱动程序上执行时,此方法才可用。它不能在 Python 操作或转换中使用。请参阅如何从操作或转换中使用 Java/Scala 函数?了解详情。

作为对@zero323答案的更新,鉴于Spark的API在过去六年中不断发展,在Spark-3.2中起作用的方法如下:

  1. 将 Scala 代码编译为 JAR 文件(例如使用 sbt assembly
  2. 将 JAR 文件包含在 --jars 参数中,以便与本地包定义所需的任何--py-files参数一起spark-submit
  3. 在 Python 中提取 JVM 实例:
jvm = spark._jvm
  1. 提取SparkSession的 Java 表示形式:
jSess = spark._jsparkSession
  1. 提取要传递给 Scala 方法的 PySpark DataFrame"df"的 Java 句柄:
jdf = df._jdf
  1. 从 PySpark 中创建一个新的SimpleClass实例:
simpleObject = jvm.com.example.SimpleClass(jSess, jdf, "v")
  1. 调用 exe 方法并将其输出转换为 PySpark DataFrame
from pyspark.sql import DataFrame
result = DataFrame(simpleObject.exe(), spark)

如果你需要传递其他参数,比如Python字典,PySpark可能会在它们出现在你的Scala方法之前自动将它们转换为相应的Java类型。Scala 提供了JavaConverters包来帮助将其转换为更自然的 Scala 数据类型。例如,Python 字典可以传递到 Scala 方法中,并立即从 Java HashMap 转换为 Scala(可变)映射:

def processDict(spark: SparkSession, jparams: java.util.Map[String, Any]) {
  import scala.collection.JavaConverters._
  val params = jparams.asScala
  ...
}

相关内容

  • 没有找到相关文章

最新更新