Flink S3 StreamingFileSink未将文件写入S3



我正在做一个POC,用于使用Flink将数据写入S3。程序没有给出错误。然而,我也没有看到任何文件正在S3中编写。

以下是代码


public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final String outputPath = "s3a://testbucket-s3-flink/data/";
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//Enable checkpointing
env.enableCheckpointing();
//S3 Sink
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.build();

//Source is a local kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "kafka:9094");
properties.setProperty("group.id", "test");

DataStream<String> input = env.addSource(new FlinkKafkaConsumer<String>("queueing.transactions", new SimpleStringSchema(), properties));


input.flatMap(new Tokenizer()) // Tokenizer for generating words
.keyBy(0) // Logically partition the stream for each word
.timeWindow(Time.minutes(1)) // Tumbling window definition
.sum(1) // Sum the number of words per partition
.map(value -> value.f0 + " count: " + value.f1.toString() + "n")
.addSink(sink);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
public static final class Tokenizer
implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}

请注意,我已经在配置中设置了s3.access-keys3.secret-key的值,并通过将它们更改为不正确的值进行了测试(我在不正确值上得到了一个错误(

有什么问题吗?

可能是遇到了这个问题吗?

考虑到Flink接收器和UDF通常不会区分正常作业终止(例如有限输入流(和因故障而终止,在作业正常终止时,最后一个正在进行的文件将不会转换为"已完成"状态。

相关内容

  • 没有找到相关文章

最新更新