为什么Spark Dataset.map要求查询的所有部分都是可序列化的



我想使用Dataset.map函数来转换数据集的行。样本如下:

val result = testRepository.readTable(db, tableName)
.map(testInstance.doSomeOperation)
.count()

其中testInstance是扩展java.io.Serializable的类,但testRepository确实扩展了它。该代码引发以下错误:

Job aborted due to stage failure.
Caused by: NotSerializableException: TestRepository

问题

我理解为什么testInstance.doSomeOperation需要序列化,因为它在映射中,并且将分发给Spark工作人员。但是为什么testRepository需要序列化呢?我不明白为什么这对地图来说是必要的。将定义更改为class TestRepository extends java.io.Serializable解决了这个问题,但在更大的项目上下文中这是不可取的。

有没有一种方法可以在不使TestRepository可序列化的情况下实现这一点,或者为什么要求它是可序列化的?

最小工作示例

下面是一个完整的例子,其中包含两个类中的代码,它们再现了NotSerializableException:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
case class MyTableSchema(id: String, key: String, value: Double)
val db = "temp_autodelete"
val tableName = "serialization_test"
class TestRepository extends java.io.Serializable {
def readTable(database: String, tableName: String): Dataset[MyTableSchema] = {
spark.table(f"$database.$tableName")
.as[MyTableSchema]
}
}
val testRepository = new TestRepository()
class TestClass() extends java.io.Serializable {
def doSomeOperation(row: MyTableSchema): MyTableSchema = {
row 
}
}
val testInstance = new TestClass()
val result = testRepository.readTable(db, tableName)
.map(testInstance.doSomeOperation)
.count()

原因是您的map操作正在读取执行器上已经发生的内容。

如果你看看你的管道:

val result = testRepository.readTable(db, tableName)
.map(testInstance.doSomeOperation)
.count()

你要做的第一件事是testRepository.readTable(db, tableName)。如果我们查看readTable方法的内部,我们会发现您在其中执行spark.table操作。如果我们从API文档中查看此方法的函数签名,我们会看到以下函数签名:

def table(tableName: String): DataFrame

这不是一个只在驱动程序上进行的操作(想象一下,在只在驱动器上读取>1TB的文件),它创建了一个Dataframe(它本身就是一个分布式数据集)。这意味着testRepository.readTable(db, tableName)函数需要分布式,因此testRepository对象需要分布式。

希望这对你有帮助!

相关内容

  • 没有找到相关文章

最新更新