我正在尝试为我的流式处理输出创建一个 s3 接收器。 我认为BucketingSink
会很好,因为它用于HDFS。 但似乎 S3 网址未被识别为 hdfs。我收到以下错误:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Cannot support file system for 's3' via Hadoop, because Hadoop is not in the classpath, or some classes are missing from the classpath.
有没有办法让 S3 适用于BucketingSink
,或者除了BucketingSink
之外还有其他选项可供我使用? 我目前正在运行 1.5.2。 很乐意提供任何其他信息。
谢谢!
编辑:
我的接收器创建/使用如下所示:
val s3Sink = new BucketingSink[String]("s3://s3bucket/sessions")
s3Sink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
s3Sink.setWriter(new StringWriter[String]())
s3Sink.setBatchSize(200)
s3Sink.setPendingPrefix("sessions-")
s3Sink.setPendingSuffix(".csv")
// Create stream and do stuff here
stream.addSink(s3Sink)
可能你必须在你的 Flink 作业中包含hadoop-aws
jar。请参阅此链接将有所帮助:https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/aws.html#provide-s3-filesystem-dependency