是fink 1.12批处理模式的BUG吗?



当我使用Flink 1.12批量时,我的代码:

public class Main {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStream<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
//                new Person("Pebbles", 2),
                new Person("Pebbles", 2)
        );
        flintstones.keyBy(person -> person.age)
                .reduce((a, b) -> {
                    a.age = a.age + b.age; return a;
                }).print();
        env.execute();
    }
    public static class Person {
        public String name;
        public Integer age;
        public Person() {};
        public Person(String name, Integer age) {
            this.name = name;
            this.age = age;
        };
        public String toString() {
            return this.name.toString() + ": age " + this.age.toString();
        };
    }
}

我认为结果应该是:

Fred: age 70
Pebbles: age 2

,但结果是:

11> Fred: age 70

lost Pebbles: age 2。如果我取消注释new Person("Pebbles", 2),,我可以得到正确的结果:

1> Pebbles: age 4
11> Fred: age 70

,如果我使用dataSet,我也可以得到正确的结果。代码:

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Person> flintstones = env.fromElements(
                new Person("Fred", 35),
                new Person("Wilma", 35),
//                new Person("Pebbles", 2),
                new Person("Pebbles", 2)
        );
        flintstones.groupBy(person -> person.age)
                .reduce((a, b) -> {
                    a.age = a.age + b.age; return a;
                }).print();
//        env.execute();
    }

结果:

Fred: age 70
Pebbles: age 2

代码仅用于测试,没有业务意义。这是BUG还是我误读了?
my maven

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-queryable-state-runtime_2.12</artifactId>
    <version>1.12.0</version>
</dependency>

在批量执行模式下reduce有一个bug,这个bug已经在master中修复了,这个bug将包含在1.12.1中。看到flink - 20764。

相关内容

  • 没有找到相关文章

最新更新