Flink DataStream排序程序不输出



我在Flink中编写了一个小的测试用例代码来对数据流进行排序。代码如下:

public enum StreamSortTest {
;
public static class MyProcessWindowFunction extends ProcessWindowFunction<Long,Long,Integer, TimeWindow> {
@Override
public void process(Integer key, Context ctx, Iterable<Long> input, Collector<Long> out) {
List<Long> sortedList = new ArrayList<>();
for(Long i: input){
sortedList.add(i);
}
Collections.sort(sortedList);
sortedList.forEach(l -> out.collect(l));
}
}
public static void main(final String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED);
DataStream<Long> probeSource = env.fromSequence(1, 500).setParallelism(2);
// range partition the stream into two parts based on data value
DataStream<Long> sortOutput =
probeSource
.keyBy(x->{
if(x<250){
return 1;
} else {
return 2;
}
})
.window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
.process(new MyProcessWindowFunction())
;
sortOutput.print();
System.out.println(env.getExecutionPlan());
env.executeAsync();
}
}

然而,代码只是输出执行计划和其他几行。但它不会输出实际排序的数字。我做错了什么?

我看到的主要问题是,您使用的是基于ProcessingTime的窗口,输入数据非常短,肯定会在20秒内处理完毕。而Flink能够检测输入的结束(如果是来自文件或序列的流,如您的情况(并生成Long.Max水印,这将关闭所有打开的基于事件时间的窗口并启动所有基于事件时间定时器。它对基于ProcessingTime的计算没有做同样的事情,所以在您的情况下,您需要断言Flink实际上会工作足够长的时间,以便关闭您的窗口或引用自定义触发器/不同的时间特征。

还有一件事我不确定,因为我从来没有用过那么多,那就是你是否应该使用executeAsync进行本地执行,因为根据这里的文档,这基本上是针对你不想等待作业结果的情况。

相关内容

  • 没有找到相关文章

最新更新