我正在尝试运行flink流的作业。我想确定流过程的吞吐量和延迟。我已经启动了Kafka Broker Server并从Kafka传入消息。我如何计算每秒消息(吞吐量(?(例如rdd.count。是否有任何类似的方法来获取传入消息的计数(
(完整的景观:我已经通过生产者作为JSON对象发送了消息。我在JSON对象中添加了一些诸如字符串和system.currenttimemills之类的信息。在流中,如何通过messagestream(数据流(获得已发送的JSON对象?(
预先感谢。
代码:
/** *读取来自kafka的字符串并将其打印为标准。*/
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir", "c:/winutils/");
// parse input argum ents
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
if(parameterTool.getNumberOfParameters() < 4) {
System.out.println("Missing parameters!nUsage: Kafka --topic <topic> " +
"--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer010<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
messageStream.print();
env.execute();
}
有一些指标在Flink UI中可用,您可以在其中计算每秒事件的数量和类似的内容。
您还可以根据自己的要求添加自己的指标来计算一些数字,并且可以在Flink UI中显示。
,最后是专门的延迟跟踪
此基准测试应用程序可能是一个不错的起点。关于延迟跟踪和Flink Kafka连接器可用的指标的文档也应该很有趣。