如何对 Flink 进行容错,将数据作为 gzip 压缩将数据接收器到 hdfs



我们希望通过 Flink 的 BucketingSink 或 StreamingFileSink 将压缩数据写入 HDFS。我已经编写了自己的编写器,如果没有发生故障,它可以正常工作。但是,当它遇到故障并从检查点重新启动时,它将生成有效长度的文件(Hadoop<2.7(或截断文件。不幸的是,gzip是二进制文件,在文件末尾有尾部。因此,简单的截断在我的情况下不起作用。有什么想法可以为压缩 hdfs sink 启用恰好一次的语义吗?

这是我的编写代码:

public class HdfsCompressStringWriter extends StreamWriterBaseV2<JSONObject> {
private static final long serialVersionUID = 2L;
/**
 * The {@code CompressFSDataOutputStream} for the current part file.
 */
private transient GZIPOutputStream compressionOutputStream;
public HdfsCompressStringWriter() {}
@Override
public void open(FileSystem fs, Path path) throws IOException {
    super.open(fs, path);
    this.setSyncOnFlush(true);
    compressionOutputStream = new GZIPOutputStream(this.getStream(), true);
}
public void close() throws IOException {
    if (compressionOutputStream != null) {
        compressionOutputStream.close();
        compressionOutputStream = null;
    }
    resetStream();
}
@Override
public void write(JSONObject element) throws IOException {
    if (element == null || !element.containsKey("body")) {
        return;
    }
    String content = element.getString("body") + "n";
    compressionOutputStream.write(content.getBytes());
    compressionOutputStream.flush();
}
@Override
public Writer<JSONObject> duplicate() {
    return new HdfsCompressStringWriter();
}

}

我建议为StreamingFileSink实现一个BulkWriter,通过GZIPOutputStream压缩元素。代码可能如下所示:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.enableCheckpointing(1000);
    final DataStream<Integer> input = env.addSource(new InfinitySource());
    final StreamingFileSink<Integer> streamingFileSink = StreamingFileSink.<Integer>forBulkFormat(new Path("output"), new GzipBulkWriterFactory<>()).build();
    input.addSink(streamingFileSink);
    env.execute();
}
private static class GzipBulkWriterFactory<T> implements BulkWriter.Factory<T> {
    @Override
    public BulkWriter<T> create(FSDataOutputStream fsDataOutputStream) throws IOException {
        final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(fsDataOutputStream, true);
        return new GzipBulkWriter<>(new ObjectOutputStream(gzipOutputStream), gzipOutputStream);
    }
}
private static class GzipBulkWriter<T> implements BulkWriter<T> {
    private final GZIPOutputStream gzipOutputStream;
    private final ObjectOutputStream objectOutputStream;
    public GzipBulkWriter(ObjectOutputStream objectOutputStream, GZIPOutputStream gzipOutputStream) {
        this.gzipOutputStream = gzipOutputStream;
        this.objectOutputStream = objectOutputStream;
    }
    @Override
    public void addElement(T t) throws IOException {
        objectOutputStream.writeObject(t);
    }
    @Override
    public void flush() throws IOException {
        objectOutputStream.flush();
    }
    @Override
    public void finish() throws IOException {
        objectOutputStream.flush();
        gzipOutputStream.finish();
    }
}

相关内容

  • 没有找到相关文章

最新更新