我想使用Dataset.map函数来转换数据集的行。样本如下:
val result = testRepository.readTable(db, tableName)
.map(testInstance.doSomeOperation)
.count()
其中testInstance
是扩展java.io.Serializable
的类,但testRepository
确实扩展了它。该代码引发以下错误:
Job aborted due to stage failure.
Caused by: NotSerializableException: TestRepository
问题
我理解为什么testInstance.doSomeOperation
需要序列化,因为它在映射中,并且将分发给Spark工作人员。但是为什么testRepository
需要序列化呢?我不明白为什么这对地图来说是必要的。将定义更改为class TestRepository extends java.io.Serializable
解决了这个问题,但在更大的项目上下文中这是不可取的。
有没有一种方法可以在不使TestRepository可序列化的情况下实现这一点,或者为什么要求它是可序列化的?
最小工作示例
下面是一个完整的例子,其中包含两个类中的代码,它们再现了NotSerializableException:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
case class MyTableSchema(id: String, key: String, value: Double)
val db = "temp_autodelete"
val tableName = "serialization_test"
class TestRepository extends java.io.Serializable {
def readTable(database: String, tableName: String): Dataset[MyTableSchema] = {
spark.table(f"$database.$tableName")
.as[MyTableSchema]
}
}
val testRepository = new TestRepository()
class TestClass() extends java.io.Serializable {
def doSomeOperation(row: MyTableSchema): MyTableSchema = {
row
}
}
val testInstance = new TestClass()
val result = testRepository.readTable(db, tableName)
.map(testInstance.doSomeOperation)
.count()
原因是您的map
操作正在读取执行器上已经发生的内容。
如果你看看你的管道:
val result = testRepository.readTable(db, tableName)
.map(testInstance.doSomeOperation)
.count()
你要做的第一件事是testRepository.readTable(db, tableName)
。如果我们查看readTable
方法的内部,我们会发现您在其中执行spark.table
操作。如果我们从API文档中查看此方法的函数签名,我们会看到以下函数签名:
def table(tableName: String): DataFrame
这不是一个只在驱动程序上进行的操作(想象一下,在只在驱动器上读取>1TB的文件),它创建了一个Dataframe(它本身就是一个分布式数据集)。这意味着testRepository.readTable(db, tableName)
函数需要分布式,因此testRepository
对象需要分布式。
希望这对你有帮助!