我有一个从CSV文件中读取DataSet
:
val dataSet = env.readCsvFile[ElecNormNew](
getClass.getResource("/elecNormNew.arff").getPath,
pojoFields = Array("date", "day", "period", "nswprice", "nswdemand", "vicprice", "vicdemand", "transfer", "label")
据我所知,ElecNormNew
是一个POJO:
// elecNormNew POJO
class ElecNormNew(
var date: Double,
var day: Int,
var period: Double,
var nswprice: Double,
var nswdemand: Double,
var vicprice: Double,
var vicdemand: Double,
var transfer: Double,
var label: String) extends Serializable {
def this() = {
this(0, 0, 0, 0, 0, 0, 0, 0, "")
}
}
我也有一个简单的类:
case class Discretizer[T](
data: DataSet[T],
nBins: Int = 5,
s: Int = 1000) {
private[this] val log = LoggerFactory.getLogger("Discretizer")
private[this] val V = Vector.tabulate(10)(_ => IntervalHeap(nBins, 1, 1, s))
private[this] def updateSamples(x: T): Vector[IntervalHeap] = {
log.warn(s"$x")
V
}
def discretize() = {
data map (x => updateSamples(x))
}
}
但是当我尝试使用它时,例如来自测试:
val a = new Discretizer[ElecNormNew](dataSet)
a.discretize
我收到以下错误:
org.apache.flink.api.common.InvalidProgramException: Task not serializable
// ...
[info] at com.elbauldelprogramador.discretizers.IDADiscretizer.discretize(IDADiscretizer.scala:69)
// ...
[info] Cause: java.io.NotSerializableException: org.apache.flink.api.scala.DataSet
// ...
我已经阅读了这个问题及其答案,但没有运气:
- 任务不可序列化 Flink
- 任务在 scala 中不可序列化
- 任务不可序列化:java.io.NotSerializable仅在闭包之外对类而不是对象调用函数时异常
我会说你提到的第一个链接提供了答案:
问题是您从 MapFunction 中引用数据集页面。这是不可能的,因为数据集只是数据流的逻辑表示形式,无法在运行时访问。
discretize
使用map
,所以这里也适用。