我用Apache Flink在HDFS上创建了一些存档数据文件,生成的文件名具有类似于part-{parallel-task}-{count}的模式,但我期望的应该有".gz"后缀,可以直接由Apache Spark加载。
我找不到任何 API 来为在 Apache Flink 中由 BucketingSink 生成的最终完成文件添加后缀,但只能将后缀添加到 InProgress、Pending 和 ValidLength 状态。有人可以帮忙吗? HDFS Connector & Java API
据我所知,没有使用默认 BucketingSink 添加后缀的选项。
一种选择是不使用检查点并将挂起的后缀设置为所需的后缀。但是,由于在大多数情况下检查点是可取的,因此这不是最佳选择。
我的解决方案是创建一个 BucketingSinkWithSuffix 实现,它几乎是默认 BucketingSink 的精确副本。唯一需要更改的是为后缀添加一个成员变量,该变量可以在构造函数中设置并调整创建基路径的方式。
这是我对构造函数的实现:
public BucketingSinkWithSuffix(String basePath, String suffix) {
this.basePath = basePath;
this.bucketer = new DateTimeBucketer<>();
this.writerTemplate = new StringWriter<>();
this.partSuffix = suffix;
}
对于生成基本路径(第 523 和 528 行(:
partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter + partSuffix);