在开始使用Spark之后,我正在努力熟悉Flink的语义。我想将DataSet[IndexNode]
写入HDFS中的永久存储,以便稍后由另一个进程读取。Spark有一个简单的ObjectFile
API来提供这样的功能,但我在Flink中找不到类似的选项。
case class IndexNode(vec: Vector[IndexNode],
id: Int) extends Serializable {
// Getters and setters etc. here
}
内置接收器倾向于基于toString
方法序列化实例,由于类的嵌套结构,这在这里不合适。我认为解决方案是使用FileOutputFormat
并将实例转换为字节流。然而,我不知道如何序列化向量,它的长度是任意的,可能有很多级别。
您可以通过使用SerializedOutputFormat
和SerializedInputFormat
来实现这一点。
尝试以下步骤:
-
使
IndexNode
从FLINK扩展IOReadableWritable
接口。将不可序列化的字段设为@transient
。实现write(DataOutputView out)
和read(DataInputView in)
方法。写入方法将从IndexNode
中写入所有数据,读取方法将读回这些数据并构建所有内部数据字段。例如,我不串行化Result
类中arr
字段的所有数据,而是写出所有值,然后读回它们,并在read方法中重建数组。class Result(var name: String, var count: Int) extends IOReadableWritable { @transient var arr = Array(count, count) def this() { this("", 1) } override def write(out: DataOutputView): Unit = { out.writeInt(count) out.writeUTF(name) } override def read(in: DataInputView): Unit = { count = in.readInt() name = in.readUTF() arr = Array(count, count) } override def toString: String = s"$name, $count, ${getArr}" }
-
使用写入数据
myDataSet.write(new SerializedOutputFormat[Result], "/tmp/test")
并用读回
env.readFile(new SerializedInputFormat[Result], "/tmp/test")