我正试图将一些数据从kafka流式传输到s3(使用s3a协议(。
管道工作一个小时很好,但一个小时后(与我为AWS设置的令牌到期设置相同(,抛出一个(来自StreamingFileSink(:
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The provided token has expired. (Service: Amazon S3; Status Code: 400; Error Code: ExpiredToken; Request ID: 7YFGVQ92YT51DP0K; S3 Extended Request ID: sx6UJJ548o0wpwJbkoWJ16jKRVih3ZV9XQdbThNhq5kUU7A7yCx58tcCGELVs5tqGWaMMPfZxZM=; Proxy: webproxy)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
...
我使用的是AWSCredentialsProvider实现,它实现了getCredentials
,并使用aws中新解析的机密每隔15分钟刷新一次令牌。
我的假设是,问题在于我如何初始化作业本身中的StreamingFileSink
:
StreamExecutionEnvironment env = getStreamExecutionEnvironment();
StreamingFileSink<FELEvent> sink = StreamingFileSink
.forBulkFormat(<my Path Settings with basePath s3a://bucket/path/to/dir>)
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withNewBucketAssigner(<My custom bucket assigner>)
.build();
env.fromSource(<Kafka source>)
.map(<Some operation>)
.filter(<Some filtergin>)
.addSink(sink)
.name("name").uid("uid");
env.execute("TAG");
如果插件为已经初始化的StreamingFileSink刷新令牌,有什么想法吗?如果没有,处理这种情况的最佳方法是什么?
(由于与动物园管理员的兼容性问题,我使用了flink 14.3。(
编辑:
我检查了hadoop-fs插件代码,它似乎只在初始化FileSink时用提供的(读取(令牌初始化了一次S3对象。正在寻找以某种方式重新初始化它的方法。
设置
fs.s3a.aws.credentials.provider:com.amazonaws.auth.profile.ProfileCredentialsProvider
在作业管理器属性中,与环境变量AWS_PROFILES
一起添加到有效的AWS配置文件(如/.aws/config
(可以修复此问题。
确保您正在刷新您的代币。
更多信息:https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html