如何在新文件中写入FLINK程序的每个滑动窗口的结果,而不是将所有Windows的结果附加到一个文件中



下面是一个flink程序(Java(,它从文件中读取推文,提取哈希标签,计算每个哈希标签的重复次数,最后写入文件。

现在在这个程序中,有一个大小为20秒的滑动窗口,可以滑动5秒。在接收器中,所有输出数据都写入名为outfile的文件中。意味着每 5 秒就会触发一个窗口并将数据写入 outfile。

我的问题:

我希望每次窗口触发(意味着每 5 秒(数据都会写入新文件中。(而不是附加到同一个文件中(。请指导在哪里以及如何完成?是否需要使用自定义触发器或有关接收器的任何配置?还是别的什么?

法典:

<!-- language: lang-java -->
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(100);
env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
String path = "C:\Users\eventTime";
// Reading data from files of folder eventTime.
DataStream<String> streamSource = env.readFile(new TextInputFormat(new Path(path)), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000).uid("read-1");
//Extracting the hash tags of tweets
DataStream<Tuple3<String, Integer, Long>> mapStream = streamSource.map(new ExtractHashTagFunction());   
//generating watermarks and extracting the timestamps from tweets
DataStream<Tuple3<String, Integer, Long>> withTimestampsAndWatermarks = mapStream.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
KeyedStream<Tuple3<String, Integer, Long>,Tuple> keyedStream = withTimestampsAndWatermarks.keyBy(0);
//Using sliding window of 20 seconds which slide by 5 seconds.
SingleOutputStreamOperator<Tuple4<String, Integer, Long, String>> aggregatedStream = keyedStream.**window(SlidingEventTimeWindows.of(Time.seconds(20),Time.seconds(5)))**
        .aggregate(new AggregateHashTagCountFunction()).uid("agg-123");                 
aggregatedStream.writeAsText("C:\Users\outfile", WriteMode.NO_OVERWRITE).setParallelism(1).uid("write-1");
env.execute("twitter-analytics");

如果对内置接收器不满意,可以定义自定义接收器:

stream.addSink(new MyCustomSink ...)

MyCustomSink应实施SinkFunction

您的自定义接收器将包含一个文件编写器,例如一个计数器。每次调用接收器时,它都会写入"/path/to/file + counter.yourFileExtension"

https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.html

相关内容

  • 没有找到相关文章