我目前正在使用 Flink 1.7 + gcs-connector lib。我正在尝试让流文件接收器写入 GCS 存储桶,但遇到了以下异常:
我遇到了这个 Jira:https://issues.apache.org/jira/browse/FLINK-11838 - 但我不清楚代码是否被合并过。
非常感谢需要做什么的任何帮助才能使其发挥作用?
java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer
at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57) ~[flink-hadoop-fs-1.7.0.jar:1.7.0]
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202) ~[flink-hadoop-fs-1.7.0.jar:1.7.0]
at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69) ~[flink-core-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) ~[flink-streaming-java_2.11-1.7.0.jar:1.7.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) ~[flink-runtime_2.11-1.7.0.jar:1.7.0]
at java.lang.Thread.run(Thread.java:834) ~[?:?]
据我所知,它没有被合并。
问题在于,为了让 Flink 接收器组件参与检查点保存和恢复,它需要能够重置接收器组件正在写入的任何内容的状态。 对于写入文件的接收器,这意味着接收器可能必须截断文件并开始追加到该文件,以便将文件返回到文件在上次检查点时所处的状态。GCS和亚马逊的S3一样,是一个二进制对象存储,而不是一个真正的文件系统。 虽然您可以使用二进制存储执行大多数操作,就像使用文件系统一样,但不能截断二进制对象,也不能追加到它。 可以让你看起来正在截断和追加到二进制对象,但这是一个非常低效的 API 层。 出于这个原因,尝试在GCS(或S3(上使用StreamingFileSink真的不是很有效。
您最好写入实际文件系统,然后添加最后一步将写入的文件传输到GCS。 这意味着你可能必须编写另一个接收器,以便检查点体系结构涵盖行为,但这是你最好的选择。