public class FlinkWindowTest {
public static long timestamp = 1496301598L;
public static void main(String[] args) throws Exception {
// get the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
SourceFunction<String> out = new OutSource();
DataStream<String> text = env.addSource(out);
// parse the data
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
public void flatMap(String value, Collector<WordWithCount> out) {
for (String word : value.split(" ")) {
out.collect(new WordWithCount(word, 1L));
}
}
});
//assign timestamp
windowCounts = windowCounts.assignTimestampsAndWatermarks(new MyTimestampExtractor(Time.seconds(0)));
windowCounts.keyBy(new MyKeySelector())
.join(windowCounts)
.where(new MyKeySelector()).equalTo(new MyKeySelector())
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(new JoinFunction<WordWithCount, WordWithCount, Object>() {
public Object join(WordWithCount wordWithCount, WordWithCount wordWithCount2) throws Exception {
System.out.println("start join");
System.out.println(wordWithCount.toString());
System.out.println(wordWithCount2.toString());
WordWithCount wordWithCount3 = new WordWithCount(wordWithCount.word, wordWithCount.count + wordWithCount2.count);
System.out.println(wordWithCount3.toString());
return wordWithCount3;
}
});
env.execute("Window WordCount");
}
public static class MyKeySelector implements KeySelector<WordWithCount, String> {
public String getKey (WordWithCount wordWithCount) throws Exception {
return wordWithCount.word;
}
}
public static class MyTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<WordWithCount> {
public MyTimestampExtractor(Time maxOutOfOrderness) {
super(maxOutOfOrderness);
}
public long extractTimestamp(WordWithCount wordWithCount) {
return wordWithCount.getTimeStamp();
}
}
public static class OutSource implements SourceFunction<String> {
private String[] str = {
"aa ff","bb gg","cc hh","dd kk"
};
public void run(SourceContext<String> sourceContext) throws Exception {
int index =0;
while (true) {
if(index == str.length)
index = 0;
sourceContext.collect(str[index]);
index++;
}
}
public void cancel() {
}
}
// Data type for words with count and timestamp
public static class WordWithCount {
public String word;
public long count;
public WordWithCount() {}
public long getTimeStamp() {
return timestamp;
}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
++timestamp;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
此类是一个演示。我创建了一个 SourceFunction 来发出字符串,然后将它们剪切成单词。最后,我使用加入操作来加入流本身。我不在乎计数结果。问题是我的 JoinFunction 类中没有输出。我认为输出应该是
start join
aa : 1
aa : 1
aa : 2
start join
........
但现在没有输出,因为元素位于窗口中,不会发送到 Join 函数。我对这种情况没有想法。如果有人有建议,请在这里告诉我。我期待所有人的答复。:)
您忘记将时间特征设置为事件时间:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);