如何在 Apache Flink 中为 BucketingSink 生成的最终完成文件添加后缀



我用Apache Flink在HDFS上创建了一些存档数据文件,生成的文件名具有类似于part-{parallel-task}-{count}的模式,但我期望的应该有".gz"后缀,可以直接由Apache Spark加载。

我找不到任何 API 来为在 Apache Flink 中由 BucketingSink 生成的最终完成文件添加后缀,但只能将后缀添加到 InProgress、Pending 和 ValidLength 状态。有人可以帮忙吗? HDFS Connector & Java API

据我所知,没有使用默认 BucketingSink 添加后缀的选项。

一种选择是不使用检查点并将挂起的后缀设置为所需的后缀。但是,由于在大多数情况下检查点是可取的,因此这不是最佳选择。

我的解决方案是创建一个 BucketingSinkWithSuffix 实现,它几乎是默认 BucketingSink 的精确副本。唯一需要更改的是为后缀添加一个成员变量,该变量可以在构造函数中设置并调整创建基路径的方式。

这是我对构造函数的实现:

    public BucketingSinkWithSuffix(String basePath, String suffix) {
    this.basePath = basePath;
    this.bucketer = new DateTimeBucketer<>();
    this.writerTemplate = new StringWriter<>();
    this.partSuffix = suffix;
}

对于生成基本路径(第 523 和 528 行(:

partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + bucketState.partCounter + partSuffix);

相关内容

  • 没有找到相关文章

最新更新