Flink流媒体 - 延迟和吞吐量检测



我正在尝试运行flink流的作业。我想确定流过程的吞吐量和延迟。我已经启动了Kafka Broker Server并从Kafka传入消息。我如何计算每秒消息(吞吐量(?(例如rdd.count。是否有任何类似的方法来获取传入消息的计数(

(完整的景观:我已经通过生产者作为JSON对象发送了消息。我在JSON对象中添加了一些诸如字符串和system.currenttimemills之类的信息。在流中,如何通过messagestream(数据流(获得已发送的JSON对象?(

预先感谢。

代码:

/** *读取来自kafka的字符串并将其打印为标准。*/

public static void main(String[] args) throws Exception {
    System.setProperty("hadoop.home.dir", "c:/winutils/");
    // parse input argum    ents
    final ParameterTool parameterTool = ParameterTool.fromArgs(args);
    if(parameterTool.getNumberOfParameters() < 4) {
        System.out.println("Missing parameters!nUsage: Kafka --topic <topic> " +
                "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
        return;
    }
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
    env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
    env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
    DataStream<String> messageStream = env
            .addSource(new FlinkKafkaConsumer010<>(
                    parameterTool.getRequired("topic"),
                    new SimpleStringSchema(),
                    parameterTool.getProperties()));

    messageStream.print();
    env.execute();
}

有一些指标在Flink UI中可用,您可以在其中计算每秒事件的数量和类似的内容。

您还可以根据自己的要求添加自己的指标来计算一些数字,并且可以在Flink UI中显示。

,最后是专门的延迟跟踪

此基准测试应用程序可能是一个不错的起点。关于延迟跟踪和Flink Kafka连接器可用的指标的文档也应该很有趣。

相关内容

  • 没有找到相关文章

最新更新