我有一个在AWS EMR中运行的具有高并行性(400(的Flink应用程序。它使用BucketingSink(使用RocksDb后端进行检查点设置(来获取Kafka并接收到S3。目的地是使用"s3a://"前缀定义的。Flink作业是一个连续运行的流媒体应用程序。在任何给定的时间,所有工作程序加在一起都有可能生成/写入400个文件(由于400并行性(。几天后,其中一个工人将失败,例外情况是:
org.apache.hadoop.fs.s3a.AWSS3IOException: copyFile(bucket/2018-09-01/05/_file-10-1.gz.in-progress, bucket/2018-09-01/05/_file-10-1.gz.pending): com.amazonaws.services.s3.model.AmazonS3Exception: We encountered an internal error. Pelase try again. (Service: Amazon S3; Status Code: 200 InternalError; Request ID: xxxxxxxxxx; S3 Extended Request ID: yyyyyyyyyyyyyyy
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java: 178)
at org.apache.hadoop.fs.s3a.S3AFileSystem.copyFile(S3AFileSystem.java: 1803)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerRename(S3AFileSystem.java:776)
at org.apache.hadoop.fs.s3a.S3AFileSystem.rename(S3AFileSystem.java:662)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.closeCurrentPartFile(BucketingSink.java:575)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.openNewPartFile(BucketingSink.java:514)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.invoke(BucketingSink.java:446)
当BucketingSink创建新的零件文件时,这种情况似乎是随机发生的。奇怪的是,这种情况是随机发生的,当它发生时,它发生在一个平行的燧发枪手身上(不是所有人(。此外,当这种情况发生时,Flink作业将转换为FAILING状态,但Flink作业不会重新启动,也不会从上一个成功的检查点恢复。造成这种情况的原因是什么?应该如何解决?此外,如何将作业配置为从上一个成功的检查点重新启动/恢复,而不是保持在FAILING状态?
我认为这是bucketing sink和S3的已知行为,建议的解决方案是在Flink 1.7.0中使用闪亮的新StreamingFileSink。
基本上,bucketing-sink希望写入和重命名像在真实文件系统中一样立即发生,但对于S3这样的对象存储来说,这不是一个好的假设,因此bucketing-sink最终会出现导致间歇性问题的竞争条件。这是一张描述这个问题的JIRA票证,相关票证使它更加充实。JIRA FLINK-9752