(AWS EMR 发布标签 5.28.x 上的 Apache Flink1.8(
我们的数据源是 AWS Kinesis 流(如果重要,则包含 450 个分片(。我们使用 FlinkKinesisConsumer 来读取 kinesis 流。 我们的应用程序偶尔(每隔几天一次(崩溃并显示"目标服务器无法响应"错误。完整堆栈跟踪位于底部。
进一步研究代码库,我发现"ProvisionedThroughputExceededException"是唯一重试的异常类型。代码
1.想知道为什么 kinesis 连接器不重试暂时性 http 响应异常?
2. 有没有办法传入重试配置,以重试这些错误?
作为旁注,我们设置了以下重试配置 -
env.setRestartStrategy(RestartStrategies.failureRateRestart(12,
org.apache.flink.api.common.time.Time.of(60, TimeUnit.MINUTES),
org.apache.flink.api.common.time.Time.of(300, TimeUnit.SECONDS)));
异常的完整堆栈跟踪 -
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:250)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:400)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
KinesisProxy 支持重试异常,并且可以通过前面答案中提到的设置来控制重试行为。但是,并非所有异常都会重试,并且默认白名单并未涵盖 Kinesis 服务通常可能发生的所有暂时性问题。我们按如下方式(随着时间的推移(自定义了代理,以达到稳定的生产设置:
@Override
protected boolean isRecoverableSdkClientException(SdkClientException ex) {
if (ex instanceof KMSThrottlingException) {
// not handled in KinesisProxy in 1.5.x
return true;
} else if (ex instanceof AmazonServiceException) {
return KinesisProxy.isRecoverableException((AmazonServiceException)ex);
} else if (ex.getCause() instanceof SocketTimeoutException) {
return true;
} else if (ex.getCause() instanceof NoHttpResponseException) {
return true;
} else if (ex.getCause() instanceof ConnectTimeoutException) {
return true;
} else if (ex.getCause() instanceof java.net.UnknownHostException) {
return true;
} else if (ex.getCause() instanceof javax.net.ssl.SSLHandshakeException) {
return true;
}
return false;
}
你用env.setRestartStrategy()
配置的重启策略是在发生故障时重启整个 Flink 作业。它不会影响 Flink 中的 Kinesis 连接器。
Kinesis 使用者具有以下配置设置(从 1.11 开始(来更改重新启动行为:
/** The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard. */
public static final String SHARD_GETRECORDS_MAX = "flink.shard.getrecords.maxrecordcount";
/** The maximum number of getRecords attempts if we get a recoverable exception. */
public static final String SHARD_GETRECORDS_RETRIES = "flink.shard.getrecords.maxretries";
/** The base backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
public static final String SHARD_GETRECORDS_BACKOFF_BASE = "flink.shard.getrecords.backoff.base";
/** The maximum backoff time between getRecords attempts if we get a ProvisionedThroughputExceededException. */
public static final String SHARD_GETRECORDS_BACKOFF_MAX = "flink.shard.getrecords.backoff.max";
/** The power constant for exponential backoff between each getRecords attempt. */
public static final String SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = "flink.shard.getrecords.backoff.expconst";
/** The interval between each getRecords request to a AWS Kinesis shard in milliseconds. */
public static final String SHARD_GETRECORDS_INTERVAL_MILLIS = "flink.shard.getrecords.intervalmillis";