在连接操作中使用翻滚窗口,但没有元素传输到我的连接函数


public class FlinkWindowTest {
    public static long timestamp = 1496301598L;
    public static void main(String[] args) throws Exception {
        // get the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // get input data by connecting to the socket
        SourceFunction<String> out = new OutSource();
        DataStream<String> text = env.addSource(out);
        // parse the data
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split(" ")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                });
        //assign timestamp
        windowCounts = windowCounts.assignTimestampsAndWatermarks(new MyTimestampExtractor(Time.seconds(0)));
        windowCounts.keyBy(new MyKeySelector())
                .join(windowCounts)
                .where(new MyKeySelector()).equalTo(new MyKeySelector())
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .apply(new JoinFunction<WordWithCount, WordWithCount, Object>() {
                    public Object join(WordWithCount wordWithCount, WordWithCount wordWithCount2) throws Exception {
                        System.out.println("start join");
                        System.out.println(wordWithCount.toString());
                        System.out.println(wordWithCount2.toString());
                        WordWithCount wordWithCount3 = new WordWithCount(wordWithCount.word, wordWithCount.count + wordWithCount2.count);
                        System.out.println(wordWithCount3.toString());
                        return wordWithCount3;
                    }
                });
        env.execute("Window WordCount");
    }
    public static class MyKeySelector implements KeySelector<WordWithCount, String> {
            public String getKey (WordWithCount wordWithCount) throws Exception {
                return wordWithCount.word;
            }
    }
    public static class MyTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<WordWithCount> {
        public MyTimestampExtractor(Time maxOutOfOrderness) {
            super(maxOutOfOrderness);
        }
        public long extractTimestamp(WordWithCount wordWithCount) {
            return wordWithCount.getTimeStamp();
        }
    }
    public static class OutSource implements SourceFunction<String> {
        private String[] str = {
                "aa ff","bb gg","cc hh","dd kk"
        };
        public void run(SourceContext<String> sourceContext) throws Exception {
            int index =0;
            while (true) {
                if(index == str.length)
                    index = 0;
                sourceContext.collect(str[index]);
                index++;
            }
        }
        public void cancel() {
        }
    }
    // Data type for words with count and timestamp
    public static class WordWithCount {
        public String word;
        public long count;
        public WordWithCount() {}
        public long getTimeStamp() {
            return timestamp;
        }
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
            ++timestamp;
        }
        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

此类是一个演示。我创建了一个 SourceFunction 来发出字符串,然后将它们剪切成单词。最后,我使用加入操作来加入流本身。我不在乎计数结果。问题是我的 JoinFunction 类中没有输出。我认为输出应该是

start join
aa : 1
aa : 1
aa : 2
start join
........

但现在没有输出,因为元素位于窗口中,不会发送到 Join 函数。我对这种情况没有想法。如果有人有建议,请在这里告诉我。我期待所有人的答复。:)

您忘记将时间特征设置为事件时间:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

相关内容

  • 没有找到相关文章

最新更新