当我使用Flink 1.12批量时,我的代码:
public class Main {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
// new Person("Pebbles", 2),
new Person("Pebbles", 2)
);
flintstones.keyBy(person -> person.age)
.reduce((a, b) -> {
a.age = a.age + b.age; return a;
}).print();
env.execute();
}
public static class Person {
public String name;
public Integer age;
public Person() {};
public Person(String name, Integer age) {
this.name = name;
this.age = age;
};
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
};
}
}
我认为结果应该是:
Fred: age 70
Pebbles: age 2
,但结果是:
11> Fred: age 70
lost Pebbles: age 2
。如果我取消注释new Person("Pebbles", 2),
,我可以得到正确的结果:
1> Pebbles: age 4
11> Fred: age 70
,如果我使用dataSet,我也可以得到正确的结果。代码:
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
// new Person("Pebbles", 2),
new Person("Pebbles", 2)
);
flintstones.groupBy(person -> person.age)
.reduce((a, b) -> {
a.age = a.age + b.age; return a;
}).print();
// env.execute();
}
结果:Fred: age 70
Pebbles: age 2
代码仅用于测试,没有业务意义。这是BUG还是我误读了?
my maven
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-runtime_2.12</artifactId>
<version>1.12.0</version>
</dependency>
在批量执行模式下reduce有一个bug,这个bug已经在master中修复了,这个bug将包含在1.12.1中。看到flink - 20764。