我一直在寻找是否有任何方法可以在Pyspark
中使用Scala
类,但我没有找到任何关于这个主题的文档或指南。
假设我在 Scala
中创建一个简单的类,它使用一些 apache-spark
库,如下所示:
class SimpleClass(sqlContext: SQLContext, df: DataFrame, column: String) {
def exe(): DataFrame = {
import sqlContext.implicits._
df.select(col(column))
}
}
- 有没有办法在
Pyspark
中使用这个类? - 是不是太难了?
- 我必须创建一个
.py
文件吗? - 有没有说明如何做到这一点的指南?
顺便说一下,我还查看了spark
代码,我感到有点迷茫,我无法为自己的目的复制它们的功能。
是的,这是可能的,尽管可能远非微不足道。通常,您需要一个Java(友好)包装器,这样您就不必处理Scala功能,这些功能无法使用普通Java轻松表达,因此不能很好地与Py4J网关配合使用。
假设你的类是 int 包com.example
并且有 Python DataFrame
调用df
df = ... # Python DataFrame
您必须:
使用您最喜欢的构建工具构建 jar。
例如,将其包含在驱动程序类路径中
--driver-class-path
PySpark shell/spark-submit
的参数。根据确切的代码,您可能还必须使用--jars
传递它从 Python
SparkContext
实例中提取 JVM 实例:jvm = sc._jvm
从
SQLContext
实例中提取 ScalaSQLContext
:ssqlContext = sqlContext._ssql_ctx
从
df
中提取 JavaDataFrame
:jdf = df._jdf
创建
SimpleClass
的新实例:simpleObject = jvm.com.example.SimpleClass(ssqlContext, jdf, "v")
调用
exe
方法并使用 PythonDataFrame
包装结果:from pyspark.sql import DataFrame DataFrame(simpleObject.exe(), ssqlContext)
结果应该是有效的 PySpark DataFrame
。当然,您可以将所有步骤合并到一个调用中。
重要提示:仅当 Python 代码仅在驱动程序上执行时,此方法才可用。它不能在 Python 操作或转换中使用。请参阅如何从操作或转换中使用 Java/Scala 函数?了解详情。
作为对@zero323答案的更新,鉴于Spark的API在过去六年中不断发展,在Spark-3.2中起作用的方法如下:
- 将 Scala 代码编译为 JAR 文件(例如使用
sbt assembly
) - 将 JAR 文件包含在
--jars
参数中,以便与本地包定义所需的任何--py-files
参数一起spark-submit
- 在 Python 中提取 JVM 实例:
jvm = spark._jvm
- 提取
SparkSession
的 Java 表示形式:
jSess = spark._jsparkSession
- 提取要传递给 Scala 方法的 PySpark
DataFrame
"df"的 Java 句柄:
jdf = df._jdf
- 从 PySpark 中创建一个新的
SimpleClass
实例:
simpleObject = jvm.com.example.SimpleClass(jSess, jdf, "v")
- 调用
exe
方法并将其输出转换为 PySparkDataFrame
:
from pyspark.sql import DataFrame
result = DataFrame(simpleObject.exe(), spark)
如果你需要传递其他参数,比如Python字典,PySpark可能会在它们出现在你的Scala方法之前自动将它们转换为相应的Java类型。Scala 提供了JavaConverters
包来帮助将其转换为更自然的 Scala 数据类型。例如,Python 字典可以传递到 Scala 方法中,并立即从 Java HashMap 转换为 Scala(可变)映射:
def processDict(spark: SparkSession, jparams: java.util.Map[String, Any]) {
import scala.collection.JavaConverters._
val params = jparams.asScala
...
}