我相信,当连续完成两个分裂时,flink的行为奇怪。我的实施逻辑可能会犯一些错误,这就是为什么我在此处发表您的意见的原因。
最少的示例:我有一个文本文件,其中包含苹果,香蕉和橙色单词。我将其作为源传递环境传递。我进行了第一个拆分,其中选择条件是参数是"苹果"一词。如果是,我将其放在"主题" apples 中,否则在"主题" notapples 中。然后,我在此拆分流上选择"主题" notapples ,然后再次将其拆分,但是这次条件检查参数是否为"橙色"一词。如果是,则将其放在"主题" oranges 中,否则在"主题" notoranges 。
中最后,当我打印最后一个拆分流的主题 notoranges 时,我的期望仅在打印"香蕉"一词时。但是,我实际上印刷过的是"苹果"one_answers"香蕉"一词。我注意到,当完成第二次拆分时,处理的流并不是仅包含我从(即NotApples(而是所有元素选择的主题元素的流。我想念什么吗?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> datastream = env.readTextFile("input.txt");
SplitStream<String> splitStream1 = datastream.split(new OutputSelector<String>() {
@Override
public Iterable<String> select(String arg0) {
List<String> output = new ArrayList<String>();
if (arg0.equals("Apple")) {
output.add("Apples");
} else {
output.add("NotApples");
}
return output;
}
});
DataStream<String> notApplesStream = splitStream1.select("NotApples");
SplitStream<String> splitStream2 = notApplesStream.split(new OutputSelector<String>() {
@Override
public Iterable<String> select(String arg0) {
List<String> output = new ArrayList<String>();
if (arg0.equals("Orange")) {
output.add("Oranges");
} else {
output.add("NotOranges");
}
return output;
}
});
DataStream<String> notApplesAndNotOrangesStream = splitStream2.select("NotOranges");
notApplesAndNotOrangesStream.print();
env.execute("SplitTest");
输出:
1> Apple
1> Apple
1> Banana
2> Apple
2> Apple
2> Apple
4> Apple
4> Apple
4> Banana
3> Apple
3> Banana
3> Apple
nb。:我知道我可以进行单个拆分以实现相同的逻辑(其中我检查参数是"苹果"或" organge"(。但是,这不是我问题的重点。最初,我在一个更复杂的程序中注意到了这种行为,我编写了两次有效的拆分,因此我决定尝试在最小的示例中重新创建它,以检查我是否可以重现它。
最近在邮件列表上进行了关于这种不正确行为的讨论,主题是"贬低split/for dataStream api"。我认为关键评论是:
首先,我们必须承认当前的拆分/选择实现 有缺陷。我大致浏览了源代码,问题可能是 对于连续的选择/拆分,前者将被覆盖 后来在流图生成阶段中一个。这就是为什么我们禁止这个 Flink-111084中的连续逻辑。
查看Flink-111084和由此产生的补丁后,我相信,如果您连续两次分开/选择,Flink的最新发行版将引发异常。
鉴于我对实现分裂/选择的了解,如果这不起作用,我不会感到惊讶(尽管我不知道足够确定(。此外,拆分/选择最近已被弃用(尽管目前尚不清楚它是否真的会消失(。
进行拆分/选择的更好方法是通过侧面输出。这是一种更强大的机制,具有更清洁的实现。