我们希望通过 Flink 的 BucketingSink 或 StreamingFileSink 将压缩数据写入 HDFS。我已经编写了自己的编写器,如果没有发生故障,它可以正常工作。但是,当它遇到故障并从检查点重新启动时,它将生成有效长度的文件(Hadoop<2.7(或截断文件。不幸的是,gzip是二进制文件,在文件末尾有尾部。因此,简单的截断在我的情况下不起作用。有什么想法可以为压缩 hdfs sink 启用恰好一次的语义吗?
这是我的编写代码:
public class HdfsCompressStringWriter extends StreamWriterBaseV2<JSONObject> {
private static final long serialVersionUID = 2L;
/**
* The {@code CompressFSDataOutputStream} for the current part file.
*/
private transient GZIPOutputStream compressionOutputStream;
public HdfsCompressStringWriter() {}
@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.setSyncOnFlush(true);
compressionOutputStream = new GZIPOutputStream(this.getStream(), true);
}
public void close() throws IOException {
if (compressionOutputStream != null) {
compressionOutputStream.close();
compressionOutputStream = null;
}
resetStream();
}
@Override
public void write(JSONObject element) throws IOException {
if (element == null || !element.containsKey("body")) {
return;
}
String content = element.getString("body") + "n";
compressionOutputStream.write(content.getBytes());
compressionOutputStream.flush();
}
@Override
public Writer<JSONObject> duplicate() {
return new HdfsCompressStringWriter();
}
}
我建议为StreamingFileSink
实现一个BulkWriter
,通过GZIPOutputStream
压缩元素。代码可能如下所示:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);
final DataStream<Integer> input = env.addSource(new InfinitySource());
final StreamingFileSink<Integer> streamingFileSink = StreamingFileSink.<Integer>forBulkFormat(new Path("output"), new GzipBulkWriterFactory<>()).build();
input.addSink(streamingFileSink);
env.execute();
}
private static class GzipBulkWriterFactory<T> implements BulkWriter.Factory<T> {
@Override
public BulkWriter<T> create(FSDataOutputStream fsDataOutputStream) throws IOException {
final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(fsDataOutputStream, true);
return new GzipBulkWriter<>(new ObjectOutputStream(gzipOutputStream), gzipOutputStream);
}
}
private static class GzipBulkWriter<T> implements BulkWriter<T> {
private final GZIPOutputStream gzipOutputStream;
private final ObjectOutputStream objectOutputStream;
public GzipBulkWriter(ObjectOutputStream objectOutputStream, GZIPOutputStream gzipOutputStream) {
this.gzipOutputStream = gzipOutputStream;
this.objectOutputStream = objectOutputStream;
}
@Override
public void addElement(T t) throws IOException {
objectOutputStream.writeObject(t);
}
@Override
public void flush() throws IOException {
objectOutputStream.flush();
}
@Override
public void finish() throws IOException {
objectOutputStream.flush();
gzipOutputStream.finish();
}
}