Apache Flink StreamingFileSink在写入S3时发出了多个HEAD请求,这导致了速率限制



我有一个Apache Flink应用程序,我已经在Kinesis Data analytics上部署了它。

这个应用程序从Kafka中读取并写入S3。它写入的S3 bucket结构是使用BucketAssigner计算的

我的问题是,假设我们必须写入这个目录结构:s3://myBucket/folder1/folder2/folder3/myFile.json

在发出PUT请求之前,它会发出以下HEAD请求:

  • HEAD /folder1
  • HEAD /folder1/folder2
  • HEAD /folder1/folder2/folder3/

然后它发出PUT请求。

它对每个请求都这样做,这导致了S3速率限制,并且在我的FLink应用程序中存在背压。

我发现有人对BucketingSink有类似的问题:https://lists.apache.org/thread/rbp2gdbxwdrk7zmvwhd2bw56mlwokpzz

上面提到的解决方案是切换到StreamingFileSink,这就是我正在做的。

关于如何在StreamingFileSink中修复此问题,有什么想法吗?

我的SinkConfig如下:

StreamingFileSink
.forRowFormat(new Path(s3Bucket), new JsonEncoder<>())
.withBucketAssigner(bucketAssigner)
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(60000)
.build())
.build()

JsonEncoder获取对象并将其转换为json,并写出类似的字节

我已经在这个问题中描述了关于整个管道如何工作的更多细节,如果这有帮助的话:沉重的背压和巨大的检查点大小的

HadoopS3文件系统试图模仿S3之上的文件系统。这意味着:
  • 在写入密钥之前;父目录";通过检查前缀直到最后一个"0"的密钥而存在/">
  • 它创建空的标记文件来标记此类父目录的存在
  • 所有这些";存在";请求是S3 HEAD请求,它们既昂贵又在创建可见性后开始违反一致性读取

因此,Hadoop S3文件系统具有非常高的";创建文件";延迟,并且它非常快地达到请求速率限制(HEAD请求在S3上具有非常低的请求速率限制(。因此,最好找到写入较少不同文件的方法。

您还可以探索使用熵注入。熵注入发生在文件系统级别,因此它应该与FileSink一起工作。但我不确定它将如何与接收器进行的分区/bucketing交互,所以您可能会发现它在实践中可用,也可能不会。如果你尝试了,请反馈!

最新更新