Apache Flink:运行许多作业时的性能问题



对于大量的 Flink SQL 查询(以下 100 个(,Flink 命令行客户端在 Yarn 集群上失败并显示"JobManager 在 600000 毫秒内没有响应",即作业永远不会在集群上启动。

  • 作业管理器日志在上次任务管理器启动后没有任何内容,除了带有"ID 为 5cd95f89ed7a66ec44f2d19eca0592f7 not 的作业"的调试日志在作业管理器中找到",指示其可能卡住(创建执行图?
  • 与本地独立的Java程序相同(最初是高 CPU(
  • 注意:structStream 中的每一行包含 515列(许多最终为 null(,包括具有 RAW 的列消息。
  • 在 YARN 集群中,我们为任务管理器指定 18GB,18GB对于作业管理器,每个插槽 5 个,并行度为 725(分区在我们的卡夫卡来源中(。

Flink SQL 查询:

select count (*), 'idnumber' as criteria, Environment, CollectedTimestamp, 
       EventTimestamp, RawMsg, Source 
from structStream
where Environment='MyEnvironment' and Rule='MyRule' and LogType='MyLogType' 
      and Outcome='Success'
group by tumble(proctime, INTERVAL '1' SECOND), Environment, 
         CollectedTimestamp, EventTimestamp, RawMsg, Source

法典

public static void main(String[] args) throws Exception {
    FileSystems.newFileSystem(KafkaReadingStreamingJob.class
                             .getResource(WHITELIST_CSV).toURI(), new HashMap<>());
    final StreamExecutionEnvironment streamingEnvironment = getStreamExecutionEnvironment();
    final StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(streamingEnvironment);
    final DataStream<Row> structStream = getKafkaStreamOfRows(streamingEnvironment);
    tableEnv.registerDataStream("structStream", structStream);
    tableEnv.scan("structStream").printSchema();
    for (int i = 0; i < 100; i++) {
        for (String query : Queries.sample) {
            // Queries.sample has one query that is above. 
            Table selectQuery = tableEnv.sqlQuery(query);
            DataStream<Row> selectQueryStream =                                                 
                               tableEnv.toAppendStream(selectQuery, Row.class);
            selectQueryStream.print();
        }
    }
    // execute program
    streamingEnvironment.execute("Kafka Streaming SQL");
}
private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception {
    Properties properties = getKafkaProperties();
    // TestDeserializer deserializes the JSON to a ROW of string columns (515)
    // and also adds a column for the raw message. 
    FlinkKafkaConsumer011 consumer = new         
         FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new TestDeserializer(getRowTypeInfo()), properties);
    DataStream<Row> stream = environment.addSource(consumer);
    return stream;
}
private static RowTypeInfo getRowTypeInfo() throws Exception {
    // This has 515 fields. 
    List<String> fieldNames = DDIManager.getDDIFieldNames();
    fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
    fieldNames.add("proctime");
    // Fill typeInformationArray with StringType to all but the last field which is of type Time
    .....
    return new RowTypeInfo(typeInformationArray, fieldNamesArray);
}
private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException {
    final StreamExecutionEnvironment env =                      
    StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    env.enableCheckpointing(60000);
    env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
    env.setParallelism(725);
    return env;
}
private static DataStream<Row> getKafkaStreamOfRows(StreamExecutionEnvironment environment) throws Exception {
    Properties properties = getKafkaProperties();
    // TestDeserializer deserializes the JSON to a ROW of string columns (515)
    // and also adds a column for the raw message. 
    FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(KAFKA_TOPIC_TO_CONSUME, new  TestDeserializer(getRowTypeInfo()), properties);
    DataStream<Row> stream = environment.addSource(consumer);
    return stream;
}
private static RowTypeInfo getRowTypeInfo() throws Exception {
    // This has 515 fields. 
    List<String> fieldNames = DDIManager.getDDIFieldNames();
    fieldNames.add("rawkafka"); // rawMessage added by TestDeserializer
    fieldNames.add("proctime");
    // Fill typeInformationArray with StringType to all but the last field which is of type Time
    .....
    return new RowTypeInfo(typeInformationArray, fieldNamesArray);
}
private static StreamExecutionEnvironment getStreamExecutionEnvironment() throws IOException {
    final StreamExecutionEnvironment env =     StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    env.enableCheckpointing(60000);
    env.setStateBackend(new FsStateBackend(CHECKPOINT_DIR));
    env.setParallelism(725);
    return env;
}

在我看来,这似乎 JobManager 因太多并发运行的作业而过载。我建议将作业分发到更多的 JobManagers/Flink 集群。

最新更新