我正在开发一个应用程序,该应用程序将一些文件上传到 s3 存储桶,稍后,它会从 s3 存储桶读取文件并将其推送到我的数据库。
我正在使用Flink 1.4.2和fs.s3a API 从 s3存储桶读取和写入文件。
将文件上传到 s3 存储桶工作正常,没有任何问题,但是当我的应用程序从 s3 读取这些上传文件的第二阶段开始时,我的应用程序会抛出以下错误:
Caused by: java.io.InterruptedIOException: Reopen at position 0 on s3a://myfilepath/a/b/d/4: org.apache.flink.fs.s3hadoop.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:125)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:155)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:281)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:364)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:702)
at org.apache.flink.api.common.io.DelimitedInputFormat.open(DelimitedInputFormat.java:490)
at org.apache.flink.api.common.io.GenericCsvInputFormat.open(GenericCsvInputFormat.java:301)
at org.apache.flink.api.java.io.CsvInputFormat.open(CsvInputFormat.java:53)
at org.apache.flink.api.java.io.PojoCsvInputFormat.open(PojoCsvInputFormat.java:160)
at org.apache.flink.api.java.io.PojoCsvInputFormat.open(PojoCsvInputFormat.java:37)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:145)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
我能够通过增加 s3a API 的最大连接参数来控制此错误。
截至目前,我在s1000 存储桶中有大约 3 个文件,这些文件由我在 s3 存储桶中的应用程序推送和拉取,我的最大连接数为 3000。我正在使用 Flink 的并行性从 s3 存储桶上传/下载这些文件。我的任务管理器计数为 14。 这是一个间歇性故障,我也有这种情况的成功案例。
我的查询是,
- 为什么我遇到间歇性故障?如果我设置的最大连接数较低,则我的应用程序应该在每次运行时引发此错误。
- 有没有办法计算我的应用程序在不遇到连接池超时错误的情况下工作所需的最佳最大连接数?或者此错误是否与我不知道的其他内容有关?
谢谢 提前
一些评论,基于我通过 Flink(批处理)工作流程处理来自 S3 的大量文件的经验:
- 当你读取文件时,Flink 会根据文件的数量和每个文件的大小来计算"拆分"。每个拆分都是单独读取的,因此同时连接的理论最大 # 不是基于文件的 #,而是文件和文件大小的组合。
- HTTP 客户端使用的连接池在一段时间后释放连接,因为能够重用现有连接是一种胜利(服务器/客户端握手不必发生)。因此,这会对池中的可用连接数引入一定程度的随机性。
- 连接池的大小对内存的影响不大,因此我通常将其设置得相当高(例如,对于最近的工作流,4096)。
- 使用 AWS 连接代码时,要颠簸的设置为
fs.s3.maxConnections
,这与纯 Hadoop 配置不同。