Flink (1.2) 窗口每个窗口产生超过 1 个输出



问题:问题是该程序每个窗口写入Kafka不止一次(每个窗口创建2-3行或更多行,同时它应该为每个窗口创建1行,因为reduce函数只允许一个元素)。我用Spark编写了相同的代码,并且可以完美运行。我一直在尝试查找有关此问题的信息,但没有找到任何:(。我也一直在尝试更改某些函数的并行性以及更多的事情,但没有任何效果,但我无法意识到问题可能出在哪里。

我正在测试 Flink 延迟。这里有我的问题环境:

集群:我使用的是 Flink 1.2.0 和 OpenJDK 8。我有 3 台计算机:1 台作业管理器、2 台任务管理器(4 个内核、2GB RAM、每个任务管理器 4 个任务插槽)。

输入数据:一个 Java 生产者为 Kafka 24 分区主题生成的行,其中包含两个元素:增量值和创建时间戳:

  • 1 1497790546981
  • 2 1497790546982
  • 3 1497790546983
  • 4 1497790546984
  • ......
  • ......................

我的 Java 类:

  • 它从具有 24 个分区的 Kafka 主题读取(Kafka 与 JobManager 位于同一台机器上)。
  • filter函数与union一起毫无用处,因为我使用它们只是为了检查它们的延迟。
  • 基本上,它为每行添加一个"1",然后每 2 秒有一个tumbling windowreduce函数将所有这个 1 和所有时间戳相加,最后一个时间戳稍后在map函数中除以 1 的总和,这给了我平均值,最后在最后一个map函数中,它将当前时刻的时间戳添加到每条简化的行和此时间戳和平均时间戳。
  • 这一行写给 Kafka(写到 2 个分区的主题)。

    //FLINK CONFIGURATION
    final StreamExecutionEnvironment env = StreamExecutionEnvironment
    .getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    
    //KAFKA CONSUMER CONFIGURATION
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.0.155:9092");
    FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), properties);
    
    //KAFKA PRODUCER
    Properties producerConfig = new Properties();
    producerConfig.setProperty("bootstrap.servers", "192.168.0.155:9092");
    producerConfig.setProperty("acks", "0");
    producerConfig.setProperty("linger.ms", "0");
    
    //MAIN PROGRAM
    //Read from Kafka
    DataStream<String> line = env.addSource(myConsumer);
    //Add 1 to each line
    DataStream<Tuple2<String, Integer>> line_Num = line.map(new NumberAdder());
    //Filted Odd numbers
    DataStream<Tuple2<String, Integer>> line_Num_Odd = line_Num.filter(new FilterOdd());
    //Filter Even numbers
    DataStream<Tuple2<String, Integer>> line_Num_Even = line_Num.filter(new FilterEven());
    //Join Even and Odd
    DataStream<Tuple2<String, Integer>> line_Num_U = line_Num_Odd.union(line_Num_Even);
    //Tumbling windows every 2 seconds
    AllWindowedStream<Tuple2<String, Integer>, TimeWindow> windowedLine_Num_U = line_Num_U
    .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(2)));
    //Reduce to one line with the sum
    DataStream<Tuple2<String, Integer>> wL_Num_U_Reduced = windowedLine_Num_U.reduce(new Reducer());
    //Calculate the average of the elements summed
    DataStream<String> wL_Average = wL_Num_U_Reduced.map(new AverageCalculator());
    //Add timestamp and calculate the difference with the average
    DataStream<String> averageTS = wL_Average.map(new TimestampAdder());
    
    //Send the result to Kafka
    FlinkKafkaProducer010Configuration<String> myProducerConfig = (FlinkKafkaProducer010Configuration<String>) FlinkKafkaProducer010
    .writeToKafkaWithTimestamps(averageTS, "testRes", new SimpleStringSchema(), producerConfig);
    myProducerConfig.setWriteTimestampToKafka(true);
    env.execute("TimestampLongKafka");
    }
    
    //Functions used in the program implementation:
    public static class FilterOdd implements FilterFunction<Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;
    public boolean filter(Tuple2<String, Integer> line) throws Exception {
    Boolean isOdd = (Long.valueOf(line._1.split(" ")[0]) % 2) != 0;
    return isOdd;
    }
    };
    
    public static class FilterEven implements FilterFunction<Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;
    public boolean filter(Tuple2<String, Integer> line) throws Exception {
    Boolean isEven = (Long.valueOf(line._1.split(" ")[0]) % 2) == 0;
    return isEven;
    }
    };
    
    public static class NumberAdder implements MapFunction<String, Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;
    public Tuple2<String, Integer> map(String line) {
    Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(line, 1);
    return newLine;
    }
    };
    
    public static class Reducer implements ReduceFunction<Tuple2<String, Integer>> {
    private static final long serialVersionUID = 1L;
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> line1, Tuple2<String, Integer> line2) throws Exception {
    Long sum = Long.valueOf(line1._1.split(" ")[0]) + Long.valueOf(line2._1.split(" ")[0]);
    Long sumTS = Long.valueOf(line1._1.split(" ")[1]) + Long.valueOf(line2._1.split(" ")[1]);
    Tuple2<String, Integer> newLine = new Tuple2<String, Integer>(String.valueOf(sum) + " " + String.valueOf(sumTS), 
    line1._2 + line2._2);
    return newLine;
    }
    };
    
    public static class AverageCalculator implements MapFunction<Tuple2<String, Integer>, String> {
    private static final long serialVersionUID = 1L;
    public String map(Tuple2<String, Integer> line) throws Exception {
    Long average = Long.valueOf(line._1.split(" ")[1]) / line._2;
    String result = String.valueOf(line._2) + " " + String.valueOf(average);
    return result;
    }
    };
    
    public static final class TimestampAdder implements MapFunction<String, String> {
    private static final long serialVersionUID = 1L;
    public String map(String line) throws Exception {
    Long currentTime = System.currentTimeMillis();
    String totalTime = String.valueOf(currentTime - Long.valueOf(line.split(" ")[1]));
    String newLine = line.concat(" " + String.valueOf(currentTime) + " " + totalTime);
    return newLine;
    }
    };
    

一些输出数据:此输出已写入 2 个分区的主题,并且生成速率小于 1000 条记录/秒(**在这种情况下,它为每个窗口创建 3 个输出行):

  • 1969 14977912409101497791241999 1089 1497791242001 1091
  • 1973149779124 09711497791241999 1028 1497791242002 1031
  • 1970 14977912409371497791242094 1157 1497791242198 1261
  • 1917 14977912429121497791243999 1087 1497791244051 1139
  • 1905 14977912429711497791243999 1028 1497791244051 1080
  • 1916年149779124 2939年1497791244096 1157年1497791244199 1260
  • 1994 14977912449151497791245999 1084 1497791246002 1087
  • 1993 14977912449661497791245999 1033 1497791246004 1038
  • 1990 14977912449391497791246097 1158 1497791246201 1262

提前感谢!

我不知道确切的原因,但我可以解决停止 Flink 集群并重新启动它的问题。在一些作业执行后,它开始产生更多的输出,以 x3 的速度,并且问题可能会继续增长。我将在 Jira 上打开一个问题并尽快更新。

相关内容

  • 没有找到相关文章

最新更新