我想使用 presto 接口和 BucketingSink 从 Flink 1.4.2 写入 S3。我按照 flink-conf.yaml s3.access-key 和 s3.secret-key 中添加的说明进行操作,并将 flink-s3-fs-presto-1.4.2.jar 放在 lib 文件夹中。下面是产生的错误。
如果作业在 AWS 环境中执行,我希望我根本不需要设置密钥。我是这个假设是正确的。
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively).
at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.initialize(Jets3tFileSystemStore.java:93)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy17.initialize(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.initialize(S3FileSystem.java:91)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1206)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
该应用程序似乎根本没有使用flink-s3-fs-presto
,而是Hadoop已弃用的旧S3文件系统。您粘贴的堆栈跟踪指示未为文件系统方案"s3://"选取flink-s3-fs-presto
。
请确保flink-s3-fs-presto
JAR文件确实位于执行作业的任务管理器的lib
文件夹中,而不仅仅是在客户端上。
- 当你使用 YARN 或 Mesos 部署 Flink 作业时,这应该会自动发生。
- 当你通过容器部署 Flink 时,请确保 JAR 文件位于容器镜像的 lib 文件夹中。
- 当你独立或手动运行 Flink 任务管理器时,请确保集群中的所有任务管理器在启动之前在 lob 文件夹中都有 JAR 文件。