在ApacheFlink中为HDFS编写自定义类



在开始使用Spark之后,我正在努力熟悉Flink的语义。我想将DataSet[IndexNode]写入HDFS中的永久存储,以便稍后由另一个进程读取。Spark有一个简单的ObjectFileAPI来提供这样的功能,但我在Flink中找不到类似的选项。

case class IndexNode(vec: Vector[IndexNode],
id: Int) extends Serializable {
// Getters and setters etc. here
}

内置接收器倾向于基于toString方法序列化实例,由于类的嵌套结构,这在这里不合适。我认为解决方案是使用FileOutputFormat并将实例转换为字节流。然而,我不知道如何序列化向量,它的长度是任意的,可能有很多级别。

您可以通过使用SerializedOutputFormatSerializedInputFormat来实现这一点。

尝试以下步骤:

  1. 使IndexNode从FLINK扩展IOReadableWritable接口。将不可序列化的字段设为@transient。实现write(DataOutputView out)read(DataInputView in)方法。写入方法将从IndexNode中写入所有数据,读取方法将读回这些数据并构建所有内部数据字段。例如,我不串行化Result类中arr字段的所有数据,而是写出所有值,然后读回它们,并在read方法中重建数组。

    class Result(var name: String, var count: Int) extends IOReadableWritable {
    @transient
    var arr = Array(count, count)
    def this() {
    this("", 1)
    }
    override def write(out: DataOutputView): Unit = {
    out.writeInt(count)
    out.writeUTF(name)
    }
    override def read(in: DataInputView): Unit = {
    count = in.readInt()
    name = in.readUTF()
    arr = Array(count, count)
    }
    override def toString: String = s"$name, $count, ${getArr}"
    }
    
  2. 使用写入数据

    myDataSet.write(new SerializedOutputFormat[Result], "/tmp/test")
    

    并用读回

    env.readFile(new SerializedInputFormat[Result], "/tmp/test")
    

相关内容

  • 没有找到相关文章

最新更新