是否有人成功使用Apache Flink 0.9处理存储在AWS S3上的数据?我发现他们正在使用自己的S3FileSystem,而不是来自Hadoop的一个…但看起来它不起作用。我将路径设置为s3://bucket.s3.amazonaws.com/folder它失败了,有以下异常:
. io .IOException: Cannot establish connection to Amazon S3:com.amazonaws.services.s3.model。AmazonS3Exception:请求我们计算的签名与你提供的签名不符检查您的密钥和签名方法。(服务:Amazon S3;状态码:
403;
2016年5月更新: Flink文档现在有一个关于如何使用Flink与AWS的页面
这个问题在Flink用户邮件列表中也有人问过,我已经在那里回答了:http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Processing-S3-data-with-Apache-Flink-td3046.html
tl;博士:
Flink项目
public class S3FileSystem {
public static void main(String[] args) throws Exception {
ExecutionEnvironment ee = ExecutionEnvironment.createLocalEnvironment();
DataSet<String> myLines = ee.readTextFile("s3n://my-bucket-name/some-test-file.xml");
myLines.print();
}
}
将以下内容添加到core-site.xml并使其可用于Flink:
<property>
<name>fs.s3n.awsAccessKeyId</name>
<value>putKeyHere</value>
</property>
<property>
<name>fs.s3n.awsSecretAccessKey</name>
<value>putSecretHere</value>
</property>
<property>
<name>fs.s3n.impl</name>
<value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>
您可以从CloudFormation模板的输出部分指定的S3桶中检索工件。即,在Flink运行时启动并运行后,可以将出租车流处理器程序提交给Flink运行时,以开始实时分析Amazon Kinesis流中的旅行事件。
$ aws s3 cp s3://«artifact-bucket»/artifacts/flink-taxi-stream-processor-1.0.jar .
$ flink run -p 8 flink-taxi-stream-processor-1.0.jar --region «AWS region» --stream «Kinesis stream name» --es-endpoint https://«Elasticsearch endpoint»
以上两个命令都使用Amazon的S3作为源,您必须相应地指定工件名称。
注意:您可以按照下面的链接,使用EMR和S3桶创建管道。
https://aws.amazon.com/blogs/big-data/build-a-real-time-stream-processing-pipeline-with-apache-flink-on-aws/